14

我正在使用 RabbitMQ 生产者向消费者发送长时间运行的任务(30 分钟以上)。问题是当与服务器的连接关闭并且未确认的任务重新排队时,消费者仍在处理任务。

通过研究,我了解到可以使用心跳增加连接超时 来解决此问题。这两种解决方案在尝试它们时都会引发错误。在阅读类似帖子的答案时,我还了解到,自发布答案以来,RabbitMQ 已经实现了许多更改(例如,默认心跳超时已从 RabbitMQ 3.5.5 之前的 580 更改为 60)。

指定心跳和阻塞的连接超时时:

credentials = pika.PlainCredentials('user', 'password')
parameters = pika.ConnectionParameters('XXX.XXX.XXX.XXX', port, '/', credentials, blocked_connection_timeout=2000)
connection = pika.BlockingConnection(parameters)

channel = connection.channel()

显示以下错误:

TypeError: __init__() got an unexpected keyword argument 'blocked_connection_timeout'

在连接参数中指定heartbeat_interval=1000时,会显示类似的错误:TypeError: __init__() got an unexpected keyword argument 'heartbeat_interval'

同样socket_timeout = 1000显示以下错误:TypeError: __init__() got an unexpected keyword argument 'socket_timeout'

我在 Ubuntu 14.04 上运行 RabbitMQ 3.6.1、pika 0.10.0 和 python 2.7。

  1. 为什么上述方法会产生错误?
  2. 可以在长时间运行的连续任务中使用心跳方法吗?例如,在执行需要 30 多分钟的大型数据库连接时可以使用心跳吗?我赞成心跳方法,因为很多时候很难判断数据库连接等任务需要多长时间。

我已经阅读了类似问题的答案

更新:运行pika 文档中的代码会产生相同的错误。

4

2 回答 2

11

我的系统也遇到了同样的问题,正如您所看到的,在很长的任务期间连接断开。

如果您的网络设置导致空闲 TCP/IP 连接被强制丢弃,则心跳可能有助于保持连接有效。但是,如果不是这种情况,那么更改心跳将无济于事。

更改连接超时根本无济于事。此设置仅在最初创建连接时使用。

我正在使用 RabbitMQ 生产者向消费者发送长时间运行的任务(30 分钟以上)。问题是当与服务器的连接关闭并且未确认的任务重新排队时,消费者仍在处理任务。

这有两个原因,您已经遇到过这两个原因:

  1. 即使在最好的情况下,连接也会随机断开
  2. 由于重新排队的消息而重新启动进程可能会导致问题

部署 RabbitMQ 代码的任务范围从不到一秒到几个小时不等,我发现立即确认消息并使用状态消息更新系统最适合像这样的非常长的任务。

您将需要一个记录系统(可能带有数据库)来跟踪给定工作的状态。

当消费者收到一条消息并启动流程时,它应该立即确认该消息并向记录系统发送“已启动”状态消息。

当该过程完成时,发送另一条消息说它已完成。

这不会解决掉线问题,但无论如何都不会 100% 解决这个问题。相反,它将防止在连接断开时发生消息重新排队问题。

但是,这个解决方案确实引入了另一个问题:当长时间运行的进程崩溃时,如何恢复工作?

基本答案是使用作业的记录系统(您的数据库)状态来告诉您您需要重新开始该作业。当应用程序启动时,检查数据库以查看是否有未完成的工作。如果有,请以任何适当的方式恢复或重新启动该工作。

于 2016-03-21T12:22:41.823 回答
-1

我已经看到了这个问题。原因是您声明使用此队列。但是您没有在交换中绑定队列。

例如:

 @Bean(name = "test_queue")
 public Queue testQueue() {
        return queue("test_queue");
 }

@RabbitListener(queues = "test_queue_1")
public void listenCreateEvent(){
}

如果您收听队列未绑定到交换。它会发生。

于 2019-06-14T10:00:53.823 回答