我们正在尝试建立一个基本的定向队列系统,其中生产者将生成多个任务,而一个或多个消费者将一次获取一个任务,处理它并确认消息。
问题是,处理可能需要 10-20 分钟,而我们当时没有响应消息,导致服务器断开我们的连接。
这是我们消费者的一些伪代码:
#!/usr/bin/env python
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
long_running_task(connection)
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
第一个任务完成后,在 BlockingConnection 深处的某个地方抛出异常,抱怨套接字被重置。此外,RabbitMQ 日志显示消费者因未及时响应而断开连接(为什么它重置连接而不是发送 FIN 很奇怪,但我们不会担心)。
我们搜索了很多,因为我们认为这是 RabbitMQ 的正常用例(有很多长时间运行的任务,应该在许多消费者之间分配),但似乎没有其他人真正遇到过这个问题。最后,我们偶然发现了一个线程,建议使用心跳并long_running_task()
在单独的线程中生成心跳。
于是代码变成了:
#!/usr/bin/env python
import pika
import time
import threading
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost',
heartbeat_interval=20))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
def thread_func(ch, method, body):
long_running_task(connection)
ch.basic_ack(delivery_tag = method.delivery_tag)
def callback(ch, method, properties, body):
threading.Thread(target=thread_func, args=(ch, method, body)).start()
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
这似乎有效,但它非常混乱。我们确定ch
对象是线程安全的吗?此外,假设long_running_task()
使用该连接参数将任务添加到新队列(即,这个漫长过程的第一部分已经完成,让我们将任务发送到第二部分)。因此,线程正在使用该connection
对象。那个线程安全吗?
更重要的是,这样做的首选方式是什么?我觉得这很混乱,可能不是线程安全的,所以也许我们做得不对。谢谢!