0

我目前正在将一些数据加载到数据框中,从网络上提取几个代码(证券)的数据,我想遍历这些证券并在特定队列中创建消息。

我有以下代码应该按预期工作。

 bps = get_latest_breakpoints(as_of_date=as_of_date) # returns a pandas DataFrame
 tickers = [str(ticker).replace(" ", '') for ticker in bps.index]
 for ticker in tickers:
        body = json.dumps({'ticker': ticker})
        print(body)
        # MAYBE DISPATCH ON A QUEUE
        process_twitter_channel.basic_publish(
            routing_key=QUEUE_GET_TICKER_TWEETS,
            exchange=QUEUE_GET_TICKER_TWEETS,
            body=body,
            properties=BasicProperties(
                delivery_mode=2,  # make message persistent
            )
        )

================== 输出 ======================

{"ticker": "ADSK"}
2021-11-29 16:44:57,063 Socket EOF; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 53220), raddr=('127.0.0.1', 25672)>
2021-11-29 16:44:57,073 Transport indicated EOF.
2021-11-29 16:44:57,074 connection_lost: StreamLostError: ('Transport indicated EOF',)
2021-11-29 16:44:57,078 Unexpected connection close detected: StreamLostError: ('Transport indicated EOF',)
Traceback (most recent call last):
  File "marketreader/actions/create_breakpoint_reports.py", line 70, in <module>
    run_breakpoint_reports()
  File "marketreader/actions/create_breakpoint_reports.py", line 59, in run_breakpoint_reports
    process_twitter_channel.basic_publish(
  File "/venv/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 2247, in basic_publish
    self._flush_output()
  File "/venv/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 1335, in _flush_output
    self._connection._flush_output(lambda: self.is_closed, *waiters)
  File "/venv/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 523, in _flush_output
    raise self._closed_result.value.error
pika.exceptions.StreamLostError: Transport indicated EOF

Buuuut,如果我不尝试在队列中分派消息,而是将它们保存到 .json 文件中,而不是从 API 获取“热”数据,而是加载先前创建的文件,它可以正常工作。我无法理解为什么会发生这种情况,因为我没有理由相信我正在访问的 API 和本地 rabbitmq 服务器之间存在冲突

4

0 回答 0