我目前正在将一些数据加载到数据框中,从网络上提取几个代码(证券)的数据,我想遍历这些证券并在特定队列中创建消息。
我有以下代码应该按预期工作。
bps = get_latest_breakpoints(as_of_date=as_of_date) # returns a pandas DataFrame
tickers = [str(ticker).replace(" ", '') for ticker in bps.index]
for ticker in tickers:
body = json.dumps({'ticker': ticker})
print(body)
# MAYBE DISPATCH ON A QUEUE
process_twitter_channel.basic_publish(
routing_key=QUEUE_GET_TICKER_TWEETS,
exchange=QUEUE_GET_TICKER_TWEETS,
body=body,
properties=BasicProperties(
delivery_mode=2, # make message persistent
)
)
================== 输出 ======================
{"ticker": "ADSK"}
2021-11-29 16:44:57,063 Socket EOF; <socket.socket fd=6, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=6, laddr=('127.0.0.1', 53220), raddr=('127.0.0.1', 25672)>
2021-11-29 16:44:57,073 Transport indicated EOF.
2021-11-29 16:44:57,074 connection_lost: StreamLostError: ('Transport indicated EOF',)
2021-11-29 16:44:57,078 Unexpected connection close detected: StreamLostError: ('Transport indicated EOF',)
Traceback (most recent call last):
File "marketreader/actions/create_breakpoint_reports.py", line 70, in <module>
run_breakpoint_reports()
File "marketreader/actions/create_breakpoint_reports.py", line 59, in run_breakpoint_reports
process_twitter_channel.basic_publish(
File "/venv/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 2247, in basic_publish
self._flush_output()
File "/venv/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 1335, in _flush_output
self._connection._flush_output(lambda: self.is_closed, *waiters)
File "/venv/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 523, in _flush_output
raise self._closed_result.value.error
pika.exceptions.StreamLostError: Transport indicated EOF
Buuuut,如果我不尝试在队列中分派消息,而是将它们保存到 .json 文件中,而不是从 API 获取“热”数据,而是加载先前创建的文件,它可以正常工作。我无法理解为什么会发生这种情况,因为我没有理由相信我正在访问的 API 和本地 rabbitmq 服务器之间存在冲突