我想执行流程。
scannerData
使用API每 30 秒获取一个代码列表(IB 官方声明)- 使用
reqHistoricalData
API获取这些代码的历史数据获取的数据scannerData
没有交易量和价格数据,因此我必须在获取reqHistoricalData
代码列表后从中检索历史数据。
我遇到的问题是,即使尚未完成,它也会被执行,并且它将继续每 30 秒扫描一次数据full_candle_df = pd.concat(self.__concat_df_list, axis=1)
。self.__full_candle_df.to_csv('my path')
reqAsyncHistoricalData
我知道historicalDataEnd
并且scannerDataEnd
喜欢回调函数来识别是否完全收到所有蜡烛数据和扫描仪数据。但对于这种情况historicalDataEnd
,只是回调函数,以确认是否仅针对 ONE TICKER 完整接收所有蜡烛数据。如您所见,我有一个 for 循环来historicalDataEnd
异步调用多个。当所有代码的蜡烛数据被完全检索到时,如何创建回调以触发?有类似 javascript 的东西promise
吗?
这是我的代码:
from constant.indicator import Indicator
from ibapi import contract
from ibapi.client import EClient
from ibapi.common import TickerId
from ibapi.contract import Contract, ContractDetails
from ibapi.scanner import ScannerSubscription
from ibapi.wrapper import EWrapper
import pandas as pd
from pytz import timezone
import time
class IBConnector(EWrapper, EClient):
def __init__(self):
EWrapper.__init__(self)
EClient.__init__(self, self)
self.__scanner_ticker_list = []
self.__datetime_list = []
self.__ohlcv_data_list = []
self.__concat_df_list = []
self.__full_candle_df = None
def error(self, reqId: TickerId, errorCode: int, errorString: str):
super().error(reqId, errorCode, errorString)
''' Callbacks to EWrapper with errorId as -1 do not represent true 'errors' but only
notification that a connector has been made successfully to the IB market data farms. '''
if errorCode == -1:
connect_success_msg = f'reqId: {reqId} Connect Success, {errorString}'
logger.debug(connect_success_msg)
else:
error_msg = f'reqId: {reqId}, Error Cause: {errorString}'
logger.exception(error_msg)
def clean_temp_candle_data(self):
self.__datetime_list = []
self.__ohlcv_data_list = []
def historicalData(self, reqId, bar):
open = bar.open
high = bar.high
low = bar.low
close = bar.close
volume = bar.volume * 100
dt = bar.date
self.__ohlcv_data_list.append([open, high, low, close, volume])
self.__datetime_list.append(dt)
#Marks the ending of historical bars reception,
def historicalDataEnd(self, reqId: int, start: str, end: str):
ticker_to_indicator_column = pd.MultiIndex.from_product([[self.__scanner_ticker_list[reqId - 1]], [Indicator.OPEN, Indicator.HIGH, Indicator.LOW, Indicator.CLOSE, Indicator.VOLUME]])
datetime_index = pd.DatetimeIndex(self.__datetime_list)
individual_ticker_candle_df = pd.DataFrame(self.__ohlcv_data_list, columns=ticker_to_indicator_column, index=datetime_index)
self.__concat_df_list.append(individual_ticker_candle_df)
self.clean_temp_candle_data()
def reqAsyncHistoricalData(ticker_list: list):
for index, ticker in enumerate(ticker_list):
contract = Contract()
contract.symbol = ticker
contract.secType = 'STK'
contract.exchange = 'SMART'
contract.currency = 'USD'
self.reqHistoricalData(index + 1, contract, '', '1 D', "1 day", "TRADES", 0, 1, False, [])
#API Scanner subscriptions update every 30 seconds, just as they do in TWS.
#The returned results to scannerData simply consists of a list of contracts, no market data field (bid, ask, last, volume, ...)
#If these are desired they have to be requested separately with the reqMktData function
def scannerData(self, reqId: int, rank: int, contractDetails: ContractDetails, distance: str, benchmark: str, projection: str, legsStr: str):
print('scanning data')
self.__scanner_ticker_list.append(contractDetails.contract.symbol)
#This function is triggered every 30 seconds
def scannerDataEnd(self, reqId: int):
print('scan end start')
self.__concat_df_list = []
logger.debug(f'reqId: {reqId}, Scanner Filtered Results: {self.__scanner_ticker_list}')
reqAsyncHistoricalData(self.__scanner_ticker_list)
#Start writing csv only after reqAsyncHistoricalData finish
#Expect sth like reqAsyncHistoricalData_promise.then(...) to execute the following code
full_candle_df = pd.concat(self.__concat_df_list, axis=1)
self.__full_candle_df.to_csv('my path')
#Expect sth like csv_promise().then(is_csv_done = False)
#to prevent scan data continues if writing csv has not finished yet
while is_csv_done:
sleep(0.1)
def main():
connector = IBConnector()
connector.connect('127.0.0.1', 7496, 0)
#Filter Criteria
scannerFilter = ScannerSubscription()
#Top Gainers
scannerFilter.scanCode = 'TOP_PERC_GAIN'
#US Stocks
scannerFilter.instrument = 'STK'
#Exclude OTC Stocks
scannerFilter.locationCode = 'STK.US.MAJOR'
scannerFilter.abovePrice = 0.3
scannerFilter.aboveVolume = 10000
scannerFilter.numberOfRows = 20
connector.reqScannerSubscription(0, scannerFilter, [], [])
connector.run()
main()