我正在使用 RabbitMQ 生产者向消费者发送长时间运行的任务(30 分钟以上)。问题是当与服务器的连接关闭并且未确认的任务重新排队时,消费者仍在处理任务。
通过研究,我了解到可以使用心跳或增加连接超时 来解决此问题。这两种解决方案在尝试它们时都会引发错误。在阅读类似帖子的答案时,我还了解到,自发布答案以来,RabbitMQ 已经实现了许多更改(例如,默认心跳超时已从 RabbitMQ 3.5.5 之前的 580 更改为 60)。
指定心跳和阻塞的连接超时时:
credentials = pika.PlainCredentials('user', 'password')
parameters = pika.ConnectionParameters('XXX.XXX.XXX.XXX', port, '/', credentials, blocked_connection_timeout=2000)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
显示以下错误:
TypeError: __init__() got an unexpected keyword argument 'blocked_connection_timeout'
在连接参数中指定heartbeat_interval=1000
时,会显示类似的错误:TypeError: __init__() got an unexpected keyword argument 'heartbeat_interval'
同样socket_timeout = 1000
显示以下错误:TypeError: __init__() got an unexpected keyword argument 'socket_timeout'
我在 Ubuntu 14.04 上运行 RabbitMQ 3.6.1、pika 0.10.0 和 python 2.7。
- 为什么上述方法会产生错误?
- 可以在长时间运行的连续任务中使用心跳方法吗?例如,在执行需要 30 多分钟的大型数据库连接时可以使用心跳吗?我赞成心跳方法,因为很多时候很难判断数据库连接等任务需要多长时间。
我已经阅读了类似问题的答案
更新:运行pika 文档中的代码会产生相同的错误。