我在我的python脚本中使用 pika 时遇到错误,它将从 rabbitmq 获取输入并将处理它并应该等待下一个,但是队列为空的我遇到错误,它不应该给出任何错误脚本应该启动并等待下一条消息的到来。
Traceback (most recent call last):
File "./Kusto_connection_with_rabbitmq_2.py", line 1675, in <module>
main()
File "./Kusto_connection_with_rabbitmq_2.py", line 1669, in main
channel.start_consuming()
File "/usr/local/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 1865,
in start_consuming
self._process_data_events(time_limit=None)
File "/usr/local/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 2026,
in _process_data_events
self.connection.process_data_events(time_limit=time_limit)
File "/usr/local/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 824,
in process_data_events
self._flush_output(common_terminator)
File "/usr/local/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 523,
in _flush_output
raise self._closed_result.value.error
pika.exceptions.StreamLostError: Transport indicated EOF
我的 python 代码看起来像:
def callback(ch, method, properties, body):
rabbit_data=[]
ch.exchange_declare(exchange='DataEx', exchange_type='fanout',durable=True)
if body != None:
rabbit_data.append(body)
if len(rabbit_data) > 0:
for i in rabbit_data:
final_packet=call_it(i)
if final_packet:
print (len(final_packet))
#out = { "reading_data": json.loads(a),
# "emission_data":json.loads(b)}
for data_output in final_packet:
ch.basic_publish(exchange='DataEx',routing_key='',
body=json.dumps(data_output),
properties=pika.BasicProperties(
content_type="text/plain",
delivery_mode=2))
#
rabbit_data.remove(i)
else:
#ch.basic_reject(delivery_tag = method.delivery_tag,requeue=True)
pass
ch.basic_ack(delivery_tag = method.delivery_tag,multiple=False)
#ch.basic_reject(delivery_tag = method.delivery_tag,requeue=True)
#
#print ("final rabbit data {}".format(rabbit_data))
def main():
credentials = pika.PlainCredentials(username, password)
parameters = pika.ConnectionParameters(host=Api_url,virtual_host=rmqvhost,credentials=credentials,heartbeat=0)
print (username,password)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue='Plume',durable=True)
channel.basic_qos(prefetch_size=0,prefetch_count=1) # this is for acknowdeging packet one by one
channel.basic_consume(queue='Plume', on_message_callback=callback,auto_ack=False)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
#channel.stop_consuming()
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)
任何人都可以在这里帮助我吗?