0

我将pika与托管的CloudAMQP实例一起使用。

消息以短时间的形式发布到队列中:大约 10 条消息/秒,然后几分钟内什么也没有。消费者有时可能需要大约 30 秒来处理一条消息。我的简单消费者代码是:

import pika, os, time

url = os.environ.get('CLOUDAMQP_URL')
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
channel = connection.channel()

def callback(ch, method, properties, body):
  print("Received " + str(body), method, properties)
  # ... long task that is equivalent to:
  time.sleep(30)

queue_name = 'test-queue'

channel.queue_declare(queue=queue_name, durable=True, exclusive=False, auto_delete=False)

channel.basic_consume(queue_name, callback, auto_ack=True)

channel.start_consuming()
connection.close()

有时,我会看到以下行为:

  • 大约 20-30 条消息在几次快速爆发中发布到队列中
  • 消费者获取所有排队的消息,一口气自动确认它们,即它们都从队列中“消失”
  • 在处理完自动确认的消息后,pika 会抛出以下异常:
pika.exceptions.StreamLostError: Stream connection lost: RxEndOfFile(-1, 'End of input stream (EOF)')

(下面的完整追溯)

auto_ack=True通过手动禁用和确认消息解决了我的问题(见下文)。

是否有解决此问题的替代方法?EOF Exception 是否发生是因为 CloudAMQP/RabbitMQ 服务器没有及时收到心跳,并关闭了连接?或者是 pika 的内部超时..?谢谢!


追溯:

Traceback (most recent call last):
 File "/app/app.py", line 146, in <module>
   pika_obj['pika_channel'].start_consuming()
 File "/app/.heroku/python/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 1866, in start_consuming
   self._process_data_events(time_limit=None)
 File "/app/.heroku/python/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 2027, in _process_data_events
   self.connection.process_data_events(time_limit=time_limit)
 File "/app/.heroku/python/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 825, in process_data_events
   self._flush_output(common_terminator)
 File "/app/.heroku/python/lib/python3.9/site-packages/pika/adapters/blocking_connection.py", line 522, in _flush_output
   raise self._closed_result.value.error
pika.exceptions.StreamLostError: Stream connection lost: RxEndOfFile(-1, 'End of input stream (EOF)')
4

1 回答 1

0

通过引入一个简单的更改,我能够修复上面的代码:在处理每条消息后手动设置auto_ack=False和调用basic_ack

url = os.environ.get('CLOUDAMQP_URL')
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
channel = connection.channel()

def callback(ch, method, properties, body):
  print("Received " + str(body), method, properties)
  # ... long task that is equivalent to:
  time.sleep(30)
  # ack the message manually
  ch.basic_ack(delivery_tag=method.delivery_tag)

queue_name = 'test-queue'

channel.queue_declare(queue=queue_name, durable=True, exclusive=False, auto_delete=False)

channel.basic_consume(queue_name, callback, auto_ack=False)

channel.start_consuming()
connection.close()
于 2021-12-09T23:07:00.860 回答