0

我想执行流程。

  1. scannerData使用API每 30 秒获取一个代码列表(IB 官方声明)
  2. 使用reqHistoricalDataAPI获取这些代码的历史数据获取的数据scannerData没有交易量和价格数据,因此我必须在获取reqHistoricalData代码列表后从中检索历史数据。

我遇到的问题是,即使尚未完成,它也会被执行,并且它将继续每 30 秒扫描一次数据full_candle_df = pd.concat(self.__concat_df_list, axis=1)self.__full_candle_df.to_csv('my path')reqAsyncHistoricalData

我知道historicalDataEnd并且scannerDataEnd喜欢回调函数来识别是否完全收到所有蜡烛数据和扫描仪数据。但对于这种情况historicalDataEnd,只是回调函数,以确认是否仅针对 ONE TICKER 完整接收所有蜡烛数据。如您所见,我有一个 for 循环来historicalDataEnd异步调用多个。当所有代码的蜡烛数据被完全检索到时,如何创建回调以触发?有类似 javascript 的东西promise吗?

这是我的代码:

from constant.indicator import Indicator
from ibapi import contract
from ibapi.client import EClient
from ibapi.common import TickerId
from ibapi.contract import Contract, ContractDetails
from ibapi.scanner import ScannerSubscription
from ibapi.wrapper import EWrapper

import pandas as pd
from pytz import timezone
import time

class IBConnector(EWrapper, EClient):
    def __init__(self):
        EWrapper.__init__(self)
        EClient.__init__(self, self)
        self.__scanner_ticker_list = []
        self.__datetime_list = []
        self.__ohlcv_data_list = []
        self.__concat_df_list = []
        self.__full_candle_df = None
    
    def error(self, reqId: TickerId, errorCode: int, errorString: str):
        super().error(reqId, errorCode, errorString)

        ''' Callbacks to EWrapper with errorId as -1 do not represent true 'errors' but only 
        notification that a connector has been made successfully to the IB market data farms. '''
        if errorCode == -1:
            connect_success_msg = f'reqId: {reqId} Connect Success, {errorString}'
            logger.debug(connect_success_msg)
        else:
            error_msg = f'reqId: {reqId}, Error Cause: {errorString}'
            logger.exception(error_msg)

    def clean_temp_candle_data(self):
        self.__datetime_list = []
        self.__ohlcv_data_list = []
 
    def historicalData(self, reqId, bar):
        open = bar.open
        high = bar.high
        low = bar.low
        close = bar.close
        volume = bar.volume * 100
        dt = bar.date

        self.__ohlcv_data_list.append([open, high, low, close, volume])
        self.__datetime_list.append(dt)

    #Marks the ending of historical bars reception,
    def historicalDataEnd(self, reqId: int, start: str, end: str):
        ticker_to_indicator_column = pd.MultiIndex.from_product([[self.__scanner_ticker_list[reqId - 1]], [Indicator.OPEN, Indicator.HIGH, Indicator.LOW, Indicator.CLOSE, Indicator.VOLUME]])
        datetime_index = pd.DatetimeIndex(self.__datetime_list)
        
        individual_ticker_candle_df = pd.DataFrame(self.__ohlcv_data_list, columns=ticker_to_indicator_column, index=datetime_index)
        self.__concat_df_list.append(individual_ticker_candle_df)
        self.clean_temp_candle_data()
    
    def reqAsyncHistoricalData(ticker_list: list):
        for index, ticker in enumerate(ticker_list):
            contract = Contract()
            contract.symbol = ticker
            contract.secType = 'STK'
            contract.exchange = 'SMART'
            contract.currency = 'USD'

            self.reqHistoricalData(index + 1, contract, '', '1 D', "1 day", "TRADES", 0, 1, False, [])

    #API Scanner subscriptions update every 30 seconds, just as they do in TWS.
    #The returned results to scannerData simply consists of a list of contracts, no market data field (bid, ask, last, volume, ...)
    #If these are desired they have to be requested separately with the reqMktData function
    def scannerData(self, reqId: int, rank: int, contractDetails: ContractDetails, distance: str, benchmark: str, projection: str, legsStr: str):
        print('scanning data')
        self.__scanner_ticker_list.append(contractDetails.contract.symbol)
    
    #This function is triggered every 30 seconds
    def scannerDataEnd(self, reqId: int):
        print('scan end start')
        self.__concat_df_list = []
        logger.debug(f'reqId: {reqId}, Scanner Filtered Results: {self.__scanner_ticker_list}')
        
        reqAsyncHistoricalData(self.__scanner_ticker_list)
        
        #Start writing csv only after reqAsyncHistoricalData finish
        #Expect sth like reqAsyncHistoricalData_promise.then(...) to execute the following code
        full_candle_df = pd.concat(self.__concat_df_list, axis=1)
        self.__full_candle_df.to_csv('my path')
        
        #Expect sth like csv_promise().then(is_csv_done = False)
        #to prevent scan data continues if writing csv has not finished yet
        while is_csv_done:
            sleep(0.1)
        


def main():
    connector = IBConnector()
    connector.connect('127.0.0.1', 7496, 0)
    
    #Filter Criteria
    scannerFilter = ScannerSubscription()
    #Top Gainers
    scannerFilter.scanCode = 'TOP_PERC_GAIN'
    #US Stocks
    scannerFilter.instrument = 'STK'
    #Exclude OTC Stocks
    scannerFilter.locationCode = 'STK.US.MAJOR' 
    scannerFilter.abovePrice = 0.3
    scannerFilter.aboveVolume = 10000
    scannerFilter.numberOfRows = 20
    
    connector.reqScannerSubscription(0, scannerFilter, [], [])
    connector.run()

main()
4

0 回答 0