grafana/mon/realtime_comms.py

111 lines
4.6 KiB
Python

import sys
import signal
import logging
import datetime
import subprocess
from threading import Timer
from ibapi.client import EClient, ExecutionFilter, Contract
from ibapi.wrapper import EWrapper, CommissionReport, Execution
from ibapi.utils import iswrapper
import psycopg2
from pytz import timezone
#lfmt = '%(asctime)s.%(msecs)03d\t%(levelname)-8s\t%(message)s'
#logging.basicConfig(format=lfmt, encoding='utf-8', level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S')
tgtAccts = [ 'U13232746' ]
EVERY=5
TZ_NY = timezone('US/Eastern')
TZ_UTC = timezone('UTC')
def get_db_connection():
return psycopg2.connect(host='127.0.0.1', database='live_acctval', user='joseph')
class IBapi(EWrapper, EClient):
def __init__(self):
EClient.__init__(self, self)
self.dbconn = get_db_connection()
self.execs = {} # id -> {}
self.totalComms = 0
self.execFilter = ExecutionFilter()
#self.execFilter.time = '20240726 17:42:33' # this needs to be in UTC
#self.getInitialState()
def getInitialState(self):
#now = datetime.datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
#print('now its %s' % now)
cur = self.dbconn.cursor()
cur.execute('SELECT created_at,day_cumulative FROM comms WHERE created_at > %s ORDER BY id DESC LIMIT 1', (datetime.date.today(),))
res = cur.fetchone()
if res is not None:
# there is a row
print('last entry in comms is at %s, cumulative %s' % (res[0], res[1]))
self.execFilter.time = (res[0] + datetime.timedelta(seconds=1)).strftime('%Y%m%d-%H:%M:%S')
self.totalComms = float(res[1])
cur.close()
self.requestExecs() # NOW do this
def shipExec(self, execId):
if self.execs[execId]['comms'] <= 0:
return
self.totalComms += self.execs[execId]['comms']
print('shipped exec %s for trade @ %s, comms %s, total %s' % (execId, self.execs[execId]['date'], self.execs[execId]['comms'], self.totalComms))
#print('type of time is %s' % type(self.execs[execId]['date'])) # 20240726 13:15:56 US/Eastern
timeOfExec = datetime.datetime.strptime(self.execs[execId]['date'], '%Y%m%d %H:%M:%S US/Eastern')
tzLocal = TZ_NY.localize(timeOfExec)
print('datetime+localize = %s' % tzLocal)
cur = self.dbconn.cursor()
cur.execute('INSERT INTO comms (created_at, amount, day_cumulative) VALUES (%s, %s, %s)', (tzLocal, self.execs[execId]['comms'], self.totalComms))
self.dbconn.commit()
cur.close()
#self.execFilter.time = (timeOfExec - datetime.timedelta(seconds=1)).strftime('%Y%m%d %H:%M:%S')
self.execFilter.time = tzLocal.astimezone(TZ_UTC).strftime('%Y%m%d-%H:%M:%S')
print('set execFilter time to %s' % self.execFilter.time)
@iswrapper
def nextValidId(self, orderId:int):
#self.requestExecs()
self.getInitialState()
def requestExecs(self):
print('request executions after time %s' % self.execFilter.time)
now = datetime.datetime.now()
nowNY = now.astimezone(TZ_NY)
#print('in NY rn its %s' % nowNY)
self.reqExecutions(10001, self.execFilter)
self.t = Timer(EVERY, self.requestExecs)
self.t.start()
@iswrapper
def execDetails(self, reqId: int, contract: Contract, execution: Execution):
if execution.execId not in self.execs:
self.execs[execution.execId] = { 'date': execution.time,
'comms': None }
elif self.execs[execution.execId]['date'] is None:
self.execs[execution.execId]['date'] = execution.time
print('ship exec %s from ED' % execution.execId)
self.shipExec(execution.execId)
@iswrapper
def commissionReport(self, commissionReport: CommissionReport):
super().commissionReport(commissionReport)
#print("CommissionReport.", commissionReport)
if commissionReport.execId not in self.execs:
self.execs[commissionReport.execId] = { 'date': None,
'comms': commissionReport.commission }
elif self.execs[commissionReport.execId]['comms'] is None:
self.execs[commissionReport.execId]['comms'] = commissionReport.commission
print('ship exec %s from CR' % commissionReport.execId)
self.shipExec(commissionReport.execId)
app = IBapi()
def signal_handler(sig, frame):
print('You pressed Ctrl+C!')
app.disconnect()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
#app.connect('127.0.0.1', 7497, 123)
app.connect('tws2', 7496, 523)
app.run()