4

我正在运行 Python ib-api 来接收来自 Interactive Brokers 的实时市场数据。它可以提供我期望的数据,但以“EReader 线程中未处理的异常”结尾。

from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.contract import Contract as IBcontract
from threading import Thread
import queue
import pandas as pd
from ibapi.ticktype import TickTypeEnum`

`DEFAULT_PRICE_DATA_ID = 1001`

`FINISHED = object()
STARTED = object()
TIME_OUT = object()`

class finishableQueue(object):

    def __init__(self, queue_to_finish):

        self._queue = queue_to_finish
        self.status = STARTED

    def get(self, timeout):

        contents_of_queue=[]
        finished=False

        while not finished:
            try:
                current_element = self._queue.get(timeout=timeout)
                if current_element is FINISHED:
                    finished = True
                    self.status = FINISHED
                else:
                    contents_of_queue.append(current_element)

            except queue.Empty:
                finished = True
                self.status = TIME_OUT

        return contents_of_queue

    def timed_out(self):
        return self.status is TIME_OUT


class TestWrapper(EWrapper):

    def __init__(self):
        self._my_price_data_dict = {}

    def get_error(self, timeout=5):
        if self.is_error():
            try:
                return self._my_errors.get(timeout=timeout)
            except queue.Empty:
                return None

        return None

    def is_error(self):
        an_error_if=not self._my_errors.empty()
        return an_error_if

    def init_error(self):
        error_queue=queue.Queue()
        self._my_errors = error_queue

    def error(self, id, errorCode, errorString):
        ## Overriden method
        errormsg = "IB error id %d errorcode %d string %s" % (id, errorCode, errorString)
        self._my_errors.put(errormsg)

    def init_ibprices(self, tickerid):
        ibprice_data_queue = self._my_price_data_dict[tickerid] = queue.Queue()

        return ibprice_data_queue

    def tickPrice(self, reqId, tickType, price, attrib):
        tickdata = (TickTypeEnum.to_str(tickType), price)

        price_data_dict = self._my_price_data_dict

        if reqId not in price_data_dict.keys():
            self.init_ibprices(reqId)

        price_data_dict[reqId].put(tickdata)


class TestClient(EClient):

    def __init__(self, wrapper):
        EClient.__init__(self, wrapper)

    def error(self, reqId, errorCode, errorString):
        print("Error: ", reqId, " ", errorCode, " ", errorString)

    def getIBrealtimedata(self, ibcontract, tickerid=DEFAULT_PRICE_DATA_ID):
        ib_data_queue = finishableQueue(self.init_ibprices(tickerid))

        self.reqMktData(
            tickerid,
            ibcontract,
            "",
            False,
            False,
            []
        )

        MAX_WAIT_SECONDS = 5
        print("Getting data from the server... could take %d seconds to complete " % MAX_WAIT_SECONDS)

        price_data = ib_data_queue.get(timeout = MAX_WAIT_SECONDS)

        while self.wrapper.is_error():
            print(self.get_error())

        if ib_data_queue.timed_out():
            print("Exceeded maximum wait for wrapper to confirm finished - seems to be normal behaviour")

        self.cancelMktData(tickerid)

        return price_data

class TestApp(TestWrapper, TestClient):
    def __init__(self, ipaddress, portid, clientid):
        TestWrapper.__init__(self)
        TestClient.__init__(self, wrapper=self)

        self.connect(ipaddress, portid, clientid)

        thread = Thread(target = self.run)
        thread.start()

        setattr(self, "_thread", thread)

        self.init_error()

def main(slist):

    app = TestApp("127.0.0.1", 7497, 1)

    for i in slist:
        ibcontract = IBcontract()
        ibcontract.secType = "STK"
        ibcontract.symbol = i
        ibcontract.exchange ="SEHK"

        Lastprice = app.getIBrealtimedata(ibcontract)

        df = pd.DataFrame(Lastprice)
        print(ibcontract.symbol, df.head())

    app.disconnect()

if __name__ == "__main__":

    seclist = [700,2318,5,12]
    main(seclist)

以下是错误消息:

EReader 线程 Traceback 中未处理的异常(最近一次调用最后一次):文件“D:\Anaconda3\envs\myweb\lib\site-packages\ibapi\reader.py”,第 34 行,>运行数据 = self.conn.recvMsg () 文件“D:\Anaconda3\envs\myweb\lib\site-packages\ibapi\connection.py”,行 >99,在 recvMsg buf = self._recvAllMsg() 文件“D:\Anaconda3\envs\myweb\ lib\site-packages\ibapi\connection.py", line >119, in _recvAllMsg buf = self.socket.recv(4096) OSError: [WinError 10038] An operation was tried on something that is >not a socket

4

2 回答 2

2

启动一个单独的线程来读取来自套接字的传入消息:

thread = Thread(target = self.run)
thread.start()

但是这个线程永远不会停止,当你调用disconnect()时它仍然在运行。结果,它尝试访问现在为 None 的套接字对象,从而触发错误。尝试通过设置在断开连接之前停止 EReader 线程done=True

附带说明一下,由于此错误发生在程序的最后断开连接时,它不应该干扰接收预期的数据。

于 2019-08-19T13:44:21.360 回答
0

避免警告的解决方法。

在您的EClient / EWrapper 子类中实现此功能:

  1. 创建一个套接字关闭函数:
import socket, time
[...]

    def _socketShutdown(self):
        self.conn.lock.acquire()
        try:
            if self.conn.socket is not None:
                self.conn.socket.shutdown(socket.SHUT_WR)
        finally:
            self.conn.lock.release()
  1. 在关闭连接之前使用它:
    self._socketShutdown()
    time.sleep(1) 
    self.disconnect()
于 2021-01-07T15:40:45.360 回答