3

我正在尝试将 pandas 模块应用到我的代码中,以便重新组织从 IB TWS 服务器收到的消息。

代码是

from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.contract import Contract


class MyWrapper(EWrapper):

    def nextValidId(self, orderId:int):
        print("Setting nextValidOrderId: %d", orderId)
        self.nextValidOrderId = orderId
        self.start()

    def historicalData(self, reqId, bar):
        print("HistoricalData. ", reqId, "Date:", bar.date, "Open:", bar.open, "High:", bar.high, "Low:", bar.low, "Close:", bar.close, "Volume:", bar.volume, "Average:", bar.average, "Count:", bar.barCount)

    def historicalDataUpdate(self, reqId, bar):
        print("HistoricalDataUpdate. ", reqId, "Date:", bar.date, "Open:", bar.open, "High:", bar.high, "Low:", bar.low, "Close:", bar.close, "Volume:", bar.volume, "Average:", bar.average, "Count:", bar.barCount)

    def error(self, reqId, errorCode, errorString):
        print("Error. Id: " , reqId, " Code: " , errorCode , " Msg: " , errorString)

    def start(self):
        queryTime = ""

        contract = Contract()
        contract.secType = "STK"
        contract.symbol = "NIO"
        contract.currency = "USD"
        contract.exchange = "SMART"

        app.reqHistoricalData(1, contract, queryTime, "1 D", "5 secs", "TRADES", 0, 1, True, [])

app = EClient(MyWrapper())
app.connect("127.0.0.1", 7496, clientId=123)
app.run()

此代码检索给定股票的历史数据,然后返回最新更新。

我面临的问题是返回的消息是这样组织的

HistoricalDataUpdate.  1 Date: 20200708  08:31:00 Open: 14.17 High: 14.17 Low: 14.17 Close: 14.17 Volume: -1 Average: 14.15 Count: -1

当我尝试以重新组织的方式检索数据时,例如

HistoricalDataUpdate.  1 Date:            Open:  High:  Low:   Close:  Volume:  Average:  Count:
                       20200708  08:31:00 14.17  14.17  14.17  14.17   -1       14.15     -1

帮助将不胜感激。

4

2 回答 2

2

回调为您提供 ibapi.common.BarData ,您可以读取它的 vars 以获取 dict 等{date:..., open:123...}

Pandas 可以从 dicts 列表中创建数据框,因此将它们存储在列表中

也许您希望将日期作为索引,pandas 也可以这样做,令人惊讶的是它可以读取格式。

完成后,您可以将数据保存在 csv 文件中。

from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.contract import Contract
import pandas as pd

class MyWrapper(EWrapper):
    def __init__(self):
        self.data = []
        self.df=None
        
    def nextValidId(self, orderId:int):
        print("Setting nextValidOrderId: %d", orderId)
        self.nextValidOrderId = orderId
        self.start()

    def historicalData(self, reqId, bar):
        self.data.append(vars(bar));
        
    def historicalDataUpdate(self, reqId, bar):
        line = vars(bar)
        # pop date and make it the index, add rest to df
        # will overwrite last bar at that same time
        self.df.loc[pd.to_datetime(line.pop('date'))] = line
        
    def historicalDataEnd(self, reqId: int, start: str, end: str):
        print("HistoricalDataEnd. ReqId:", reqId, "from", start, "to", end)
        self.df = pd.DataFrame(self.data)
        self.df['date'] = pd.to_datetime(self.df['date'])
        self.df.set_index('date', inplace=True)
        
    def error(self, reqId, errorCode, errorString):
        print("Error. Id: " , reqId, " Code: " , errorCode , " Msg: " , errorString)

    def start(self):
        queryTime = ""
        
        # so everyone can get data use fx
        fx = Contract()
        fx.secType = "CASH" 
        fx.symbol = "USD"
        fx.currency = "JPY"
        fx.exchange = "IDEALPRO"
        
        # setting update to 1 minute still sends an update every tick? but timestamps are 1 min
        # I don't think keepUpToDate sends a realtimeBar every 5 secs, just updates the last bar.
        app.reqHistoricalData(1, fx, queryTime, "1 D", "1 min", "MIDPOINT", 0, 1, True, [])

wrap = MyWrapper()        
app = EClient(wrap)
app.connect("127.0.0.1", 7497, clientId=123)

#I just use this in jupyter so I can interact with df
import threading
threading.Thread(target = app.run).start()

#this isn't needed in jupyter, just run another cell
import time
time.sleep(300) # in 5 minutes check the df and close

print(wrap.df)
wrap.df.to_csv("myfile.csv")#save in file
app.disconnect()

#in jupyter to show plot
%matplotlib inline 
wrap.df.close.plot()

我使用 jupyter notebook,所以我添加了线程,所以我仍然可以交互。

这是一些输出。接收和打印的第一个数据来自historyDataEnd。数据帧由具有日期时间索引的变量组成,因此可以按时间添加条形图。

HistoricalDataEnd. ReqId: 1 from 20200707 14:23:19 to 20200708 14:23:19

然后在 300 秒后打印数据帧。检查 ohlc 是否合乎逻辑,并注意每分钟都有一个新柱。14:28 柱只是我假设的前 19 秒,因为我的五分钟(300 秒)从 14:23:19 开始。这正是您想要和期望保持图表最新的行为。

2020-07-08 14:24:00  107.231  107.236  107.231  107.233     -1       -1   
2020-07-08 14:25:00  107.233  107.234   107.23  107.232     -1       -1   
2020-07-08 14:26:00  107.232  107.232  107.225  107.232     -1       -1   
2020-07-08 14:27:00  107.232  107.239  107.231  107.239     -1       -1   
2020-07-08 14:28:00  107.239  107.239  107.236  107.236     -1       -1   

您可以看到它获取了所有条形图(仅在图表中关闭)并使其保持最新状态。 日元

于 2020-07-08T17:19:46.243 回答
0
  1. 这真的是 ETL(提取、转换、加载)
  2. 我可以看到每个数据元素的形式为Name:。使用它作为 reg expr 获取所有名称标记
  3. 使用此列表,根据标记和下一个标记的位置将每个标记提取到字典中
  4. 获取第一个令牌之前的数据标签
  5. 最后把它变成一个熊猫数据框
text= "HistoricalDataUpdate.  1 Date: 20200708  08:31:00 Open: 14.17 High: 14.17 Low: 14.17 Close: 14.17 Volume: -1 Average: 14.15 Count: -1"
tokens = re.findall("([A-Z][a-z]*:)", text)
json = {t:text[re.search(tokens[i], text).span(0)[1]:re.search(tokens[i+1], text).span(0)[0]] 
        if i+1<len(tokens) 
        else text[re.search(tokens[i], text).span(0)[1]:] 
        for i,t in enumerate(tokens)}
json = {"label":text[:re.search(tokens[0], text).span(0)[0]], **json}
df = pd.DataFrame([json])
df

输出

    label   Date:   Open:   High:   Low:    Close:  Volume: Average:    Count:
0   HistoricalDataUpdate. 1 20200708 08:31:00   14.17   14.17   14.17   14.17   -1  14.15   -1


于 2020-07-08T15:00:20.857 回答