0

我在我的python脚本中使用 pika 时遇到错误,它将从 rabbitmq 获取输入并将处理它并应该等待下一个,但是队列为空的我遇到错误,它不应该给出任何错误脚本应该启动并等待下一条消息的到来。

Traceback (most recent call last):
File "./Kusto_connection_with_rabbitmq_2.py", line 1675, in <module>
  main()
File "./Kusto_connection_with_rabbitmq_2.py", line 1669, in main
  channel.start_consuming()
File "/usr/local/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 1865, 
  in start_consuming
self._process_data_events(time_limit=None)
File "/usr/local/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 2026, 
  in  _process_data_events
self.connection.process_data_events(time_limit=time_limit)
File "/usr/local/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 824, 
in process_data_events
self._flush_output(common_terminator)
File "/usr/local/lib/python3.8/site-packages/pika/adapters/blocking_connection.py", line 523, 
 in _flush_output
 raise self._closed_result.value.error
pika.exceptions.StreamLostError: Transport indicated EOF

我的 python 代码看起来像:

def callback(ch, method, properties, body):
rabbit_data=[]
ch.exchange_declare(exchange='DataEx', exchange_type='fanout',durable=True)

if body != None:
    
    rabbit_data.append(body)
    
    if len(rabbit_data) > 0:    
        
        for i in rabbit_data:
            final_packet=call_it(i)
            if final_packet:
                print (len(final_packet))
            
                #out = { "reading_data": json.loads(a),
                #      "emission_data":json.loads(b)}   
                for data_output in final_packet: 
                
                
                    ch.basic_publish(exchange='DataEx',routing_key='',
                                 body=json.dumps(data_output),
                                 properties=pika.BasicProperties(
                                     content_type="text/plain",
                                     delivery_mode=2))
                
                #
                rabbit_data.remove(i)
            else:
                #ch.basic_reject(delivery_tag = method.delivery_tag,requeue=True)
                pass
                
ch.basic_ack(delivery_tag = method.delivery_tag,multiple=False)   
#ch.basic_reject(delivery_tag = method.delivery_tag,requeue=True) 
#        

        
#print ("final rabbit data {}".format(rabbit_data))



def main():

credentials = pika.PlainCredentials(username, password)
parameters = pika.ConnectionParameters(host=Api_url,virtual_host=rmqvhost,credentials=credentials,heartbeat=0)
print (username,password)

connection = pika.BlockingConnection(parameters)
channel = connection.channel()

channel.queue_declare(queue='Plume',durable=True)


channel.basic_qos(prefetch_size=0,prefetch_count=1) # this is for acknowdeging packet one by one 
channel.basic_consume(queue='Plume', on_message_callback=callback,auto_ack=False)


print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
#channel.stop_consuming()


if __name__ == '__main__':
try:
    main()
except KeyboardInterrupt:
    print('Interrupted')
    try:
        sys.exit(0)
    except SystemExit:
        os._exit(0)

任何人都可以在这里帮助我吗?

4

0 回答 0