1

我能够从该站点上的 IB 文档/示例和论坛中拼凑出一个脚本。我得到了我想要的单个符号的输出,但是,如果我使用股票列表,我无法找到将股票代码传递给 DF 输出文件的方法。我的解决方法是创建一个使用列表序列的字典(见下文),但是每次使符号几乎毫无意义时,IB 的 api 的输出都会略有变化。我在下面使用的列表通常有 20 多个名称,但可能会更改,我将其删减以使其更易于查看。

@Brian/and 或其他开发人员,如果有一种方法可以为每个符号调用创建一个唯一的 ID/序列并将其标记到返回的数据上,那么我可以利用字典来应用该符号。在另一个论坛中,您传递了 n_id = n_id +1 的行,如果可以应用它并链接到按列表顺序完成的每个特定调用,那么这可以工作吗?

from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.contract import Contract
import pandas as pd
import threading
import time
from datetime import timedelta
import datetime

class IBapi(EWrapper, EClient):
    def __init__(self):
        EClient.__init__(self, self)
        self.data = [] #Initialize variable to store candle

    def historicalData(self, reqId, bar):
        #print(f'Time: {bar.date} Close: {bar.close} Volume: {bar.volume}',reqId)
        self.data.append([bar.date, bar.close, bar.volume, reqId])
     
def run_loop():
    app.run()

app = IBapi()
app.connect('127.0.0.1', 7496, 123)

#Start the socket in a thread
api_thread = threading.Thread(target=run_loop, daemon=True)
api_thread.start()

time.sleep(1) #Sleep interval to allow time for connection to server

symbols = ['SPY','MSFT','GOOG','AAPL','QQQ','IWM','TSLA']

for sym in symbols:
    contract = Contract()
    contract.symbol = str(sym) 
    contract.secType = "STK"
    contract.exchange = "SMART"
    contract.currency = "USD"
    #contract.primaryExchange = "ISLAND"
    app.reqHistoricalData(1, contract, "", "1 D", "10 mins", "ADJUSTED_LAST", 1, 2, False, [])

 time.sleep(5) #sleep to allow enough time for data to be returned

df = pd.DataFrame(app.data, columns=['DateTime', 'ADJUSTED_LAST','Volume','reqId'])
df['DateTime'] = pd.to_datetime(df['DateTime'],unit='s') #,unit='s') 

df['Count'] = df.groupby('DateTime').cumcount()+1
sym_dict = {1:'SPY',2:'MSFT',3:'GOOG',4:'AAPL',5:'QQQ',6:'IWM',7:'TSLA'}

df['Ticker'] = df['Count'].map(sym_dict)

print(df)

#edit,添加@Brian的详细信息:

from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.contract import Contract
import pandas as pd
import time
from datetime import timedelta
import datetime

start = datetime.datetime.utcnow()

class IBapi(EWrapper, EClient):
    def __init__(self):
        EClient.__init__(self, self)
        self.data = [] 

def error(self, reqId, errorCode, errorString):
    print("Error. Id: " , reqId, " Code: " , errorCode , " Msg: " , errorString)

def historicalData(self, reqId, bar):
    self.data.append([bar.date, bar.close, bar.volume, sym_dict[reqId]])
    print("HistoricalData. ReqId:", sym_dict[reqId], "BarData.", bar)
 
# include this callback to track progress and maybe disconnectwhen all are finished
def historicalDataEnd(self, reqId: int, start: str, end: str):
    print("finished", sym_dict[reqId])

def run_loop():
    app.run()

app = IBapi()
app.connect('127.0.0.1', 7496, 123)

# you should wait for nextValidId instead of sleeping, what if it takes more than 1 second? @john: how do i do this?
time.sleep(5) @john: how do i do this? wait for nextValidId?

symbols = ['SPY','MSFT','GOOG','AAPL','QQQ','IWM','TSLA']

reqId = 1
sym_dict = {}
for sym in symbols:
    contract = Contract()
    contract.symbol = str(sym) 
    sym_dict[reqId] = sym
    contract.secType = "STK"
    contract.exchange = "SMART"
    contract.currency = "USD"
    #contract.primaryExchange = "ISLAND" # you may need this for msft
    app.reqHistoricalData(reqId, contract, "", "1 D", "10 mins", "ADJUSTED_LAST", 1, 2, False, [])
    reqId += 1
    time.sleep(5)

df = pd.DataFrame(app.data, columns=['DateTime', 'ADJUSTED_LAST','Volume','sym'])
df['DateTime'] = pd.to_datetime(df['DateTime'],unit='s') #,unit='s') 
df = df.set_index(['sym','DateTime']).sort_index()
print(df)
app.disconnect()
4

1 回答 1

3

您只需要维护一个 reqId 和符号的字典。

我不确定一个 DataFrame 是存储数据的最佳方式,但如果你这样做了,那么设置一个多索引。决定你想要多少数据以及如何将它存储在磁盘上,然后决定数据结构。为了简单起见,我建议使用 csv 来提高速度或使用 sqlite。熊猫可以处理任何一个。

我删除了您的评论并添加了一些我自己的评论。

from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.contract import Contract
import pandas as pd
import threading
import time
from datetime import timedelta
import datetime

# I added this code to get fake data, works wtihout tws running
from ibapi.common import BarData
from random import random
start = datetime.datetime.utcnow()
def fake_data(reqId, ib):
    last = reqId*10
    for i in range(60, 0, -10):
        bar = BarData();
        bar.date = start - timedelta(minutes=i)
        last += random() - 0.5
        bar.close = last
        bar.volume = reqId * 1000
        ib.historicalData(reqId, bar)
    ib.historicalDataEnd(reqId,"","")
    
class IBapi(EWrapper, EClient):
    def __init__(self):
        EClient.__init__(self, self)
        self.data = [] 

    #always include this for important messages, also turn on api logging in TWS/IBG    
    def error(self, reqId, errorCode, errorString):
        print("Error. Id: " , reqId, " Code: " , errorCode , " Msg: " , errorString)

    def historicalData(self, reqId, bar):
        self.data.append([bar.date, bar.close, bar.volume, sym_dict[reqId]])
     
    # include this callback to track progress and maybe disconnectwhen all are finished
    def historicalDataEnd(self, reqId: int, start: str, end: str):
        print("finished", sym_dict[reqId])
        
def run_loop():
    app.run()

app = IBapi()
app.connect('127.0.0.1', 7496, 123)

# threading is needed only if you plan to interact after run is called
# this is a good way if you use a ui like jupyter
api_thread = threading.Thread(target=run_loop, daemon=True)
api_thread.start()

# you should wait for nextValidId instead of sleeping, what if it takes more than 1 second?
time.sleep(1)

symbols = ['SPY','MSFT','GOOG','AAPL','QQQ','IWM','TSLA']

reqId = 1
sym_dict = {}
for sym in symbols:
    contract = Contract()
    contract.symbol = str(sym) 
    sym_dict[reqId] = sym
    contract.secType = "STK"
    contract.exchange = "SMART"
    contract.currency = "USD"
    #contract.primaryExchange = "ISLAND" # you may need this for msft
    #app.reqHistoricalData(reqId, contract, "", "1 D", "10 mins", "ADJUSTED_LAST", 1, 2, False, [])
    fake_data(reqId, app)
    reqId += 1
    #now you need to sleep(10) to make sure you don't get a pacing error for too many requests
    
# don't sleep, use historicalDataEnd to know when finished
time.sleep(5)

df = pd.DataFrame(app.data, columns=['DateTime', 'ADJUSTED_LAST','Volume','sym'])
df['DateTime'] = pd.to_datetime(df['DateTime'],unit='s')

#make an index and sort
df = df.set_index(['sym','DateTime']).sort_index()
# now you can use the indexes
print(df.loc[("SPY","2021")])

#don't forget to disconnect somewhere or the clientId will still be in use
于 2021-01-02T17:09:45.853 回答