3

我正在尝试新的 Python Interactive Broker API,但在第一步我遇到了一些严重的速度问题......

以下代码(见下文)次

0:00:08.832813直到接收完数据

0:00:36.000785直到应用程序完全断开连接...

为什么这么慢?加快速度的最佳方法是什么?

from ibapi import wrapper
from ibapi.client import EClient
from ibapi.utils import iswrapper #just for decorator
from ibapi.common import *
from ibapi.contract import *
import datetime
from datetime import timedelta


class DataApp(wrapper.EWrapper, EClient):
    def __init__(self):
        wrapper.EWrapper.__init__(self)
        EClient.__init__(self, wrapper=self)

    @iswrapper
    def historicalData(self, reqId: TickerId, date: str, open: float, high: float,
                            low: float, close: float, volume: int, barCount: int,
                            WAP: float, hasGaps: int):
        super().historicalData(reqId, date, open, high, low, close, volume,
                                barCount, WAP, hasGaps)
        print("HistoricalData. ", reqId, " Date:", date, "Open:", open,
               "High:", high, "Low:", low, "Close:", close, "Volume:", volume)

    @iswrapper
    def historicalDataEnd(self, reqId: int, start: str, end: str):
        super().historicalDataEnd(reqId, start, end)
        print("HistoricalDataEnd ", reqId, "from", start, "to", end)
        print(datetime.datetime.now()-startime)
        self.done = True # This ends the messages loop - this was not in the example code...

    def get_data(self):        
        self.connect("127.0.0.1", 4002, clientId=10)
        print("serverVersion:%s connectionTime:%s" % (self.serverVersion(),
                                                self.twsConnectionTime()))

        cont = Contract()
        cont.symbol = "ES"
        cont.secType = "FUT"
        cont.currency = "USD"
        cont.exchange = "GLOBEX"
        cont.lastTradeDateOrContractMonth = "201706"
        self.reqHistoricalData(1, cont, datetime.datetime.now().strftime("%Y%m%d %H:%M:%S"),
                               "1800 S", "30 mins", "TRADES", 0, 1, [])
        self.run()        
        self.disconnect()
        print(datetime.datetime.now()-startime)

global starttime
startime = datetime.datetime.now()
DA = DataApp()
DA.get_data()

我还尝试在后台继续运行它,以便只在运行时提交请求

def runMe():
    app.run() # where run() has be removed from the class definition

import threading
thread = threading.Thread(target = runMe)
thread.start()

但它也非常缓慢。任何建议表示赞赏

4

3 回答 3

2

我建议您在 ibapi 模块的连接类中修改连接套接字锁。推荐来自github上的heshiming;如果您有权访问私人互动经纪人回购,您可以在此处访问讨论https://github.com/InteractiveBrokers/tws-api/issues/464

我这样做了,它显着提高了性能。

合时明建议您减少套接字锁定对象的超时时间,每次发送或接收消息时都会调用该超时时间。要修改套接字锁定,请转到 ibapi 的 site-packages 文件夹并修改 connection.py 中的连接函数,将“self.socket.settimeout(1)”更改为“self.socket.settimeout(0.01)”。这是我拥有的版本的 connection.py 中的第 48 行。

万一你看不到heshiming的帖子,我已经把它放在了这个帖子的底部。

替代选项:另一个有趣的解决方案是利用 asyncio 进行异步事件循环。我还没有这样做,但它看起来很有希望。请参阅 Ewald 放在一起的示例https://github.com/erdewit/tws_async

和世铭评论:

Connection /ibapi/connection.py 的实现在 sendMsg 和 recvMsg 中都有一个 Lock 对象。由于 connect 调用了 self.socket.settimeout(1),因此底层的 self.socket.recv(4096) 每秒只超时一次。

这样的实现会产生性能问题。由于锁是共享的,因此套接字在接收时无法发送数据。在接收到的消息长度小于 4k 字节的场景下,recvMsg 函数会在释放锁之前等待 1 秒,使后续的 sendMsg 等待。在我的实验中,大多数消息似乎都小于 4k 字节。换句话说,这规定了每秒一个 recvMsg 的上限。

有几种策略可以缓解这种情况。可以将接收缓冲区减少到远小于 4k 的数字,或者将套接字超时减少到 0.001 秒以减少阻塞。

或者根据 http://stackoverflow.com/questions/1981372/are-parallel-calls-to-send-recv-on-the-same-socket-valid ,套接字本身实际上是线程安全的。因此不需要锁。

我尝试了所有三种策略。卸下锁效果最好。将超时时间减少到 0.001 的工作方式类似。

我只能保证 linux/unix 平台,我还没有在 Windows 上尝试过。您会考虑更改实施以改善这一点吗?

于 2017-06-20T16:35:14.293 回答
0

当前的 API(2019 年 5 月)通过删除recvMsg@BenM 在他的回答中提出的锁定来提高速度。但是,根据请求的类型,它仍然可能很慢。

删除他们的大部分日志记录有所帮助,大概是因为某些 msg 类型非常大。我最初尝试使用logging库将其过滤掉,但完全删除代码在速度方面效果更好,我将其归结为生成更大字符串所需的额外处理,甚至在传递到logging.

用 deque 替换 Queue :用collections.dequeQueue替换in可能也是值得的,因为它的速度要快 10 倍以上。我已经在其他地方做了这个以加快速度,但还没有做到这一点。应该是没有锁的线程安全的,就像在这里使用的方式一样:“双端队列支持线程安全、内存高效的从双端队列的任一侧追加和弹出”(来自文档)。client.pydequedeque

队列与双端队列速度比较

于 2019-10-25T09:00:09.733 回答
-1

app.run()while 是一个无限循环,但是当设置为 Trueapp.done == False时它不会立即停止。app.done(我不知道为什么)。

我所做的是编写一个新方法而不是使用app.run().

这是我的解决方案:

import time
from ibapi import (decoder, reader, comm)

并将此功能放入您的客户端类。

def getMessage(self, wait=3):
    # wait 3 secs for response to come in
    time.sleep(wait)
    # get everything in app.msg_queue
    while not self.msg_queue.empty():
        text = self.msg_queue.get(block=True, timeout=0.2)
        fields = comm.read_fields(text)
        self.decoder.interpret(fields)

用法很简单。只需使用app.getMessage()而不是app.run()

于 2017-06-12T03:52:03.793 回答