首先你定义一个Queue
:
from Queue import Queue
q = Queue()
然后,在您的线程中,您尝试从该队列中获取一个项目:
msg = q.get()
这将阻塞整个线程,直到在队列中找到一些东西。
现在,同时,假设您的传入事件通过触发回调的方式得到通知,您注册一个回调,它只是将接收到的 RabbitMQ 消息放入队列中:
def on_message(msg):
q.put(msg)
rabbitmq_channel.register_callback(on_message)
或者如果你喜欢更短的代码:
rabbitmq_channel.register_callback(lambda msg: q.put(msg))
(上面是伪代码,因为我没有使用过 RabbitMQ 也没有使用任何 Python 绑定 RabbitMQ,但是您应该能够轻松地弄清楚如何将代码段改编为您的实际应用程序代码;要注意的关键部分是q.put(msg)
——只需制作确保在通知新消息后立即调用该部分。)
一旦发生这种情况,线程就会被唤醒并可以自由处理消息。为了对多条消息重用同一个线程,只需使用一个while
循环:
while True:
msg = q.get()
process_message(msg)
PS我建议研究Gevent以及如何在你的Python应用程序中将它与RabbitMQ结合起来,以便能够摆脱线程并使用更轻量级和可扩展的绿色线程机制,而无需管理线程池(因为你可以只有成千上万的greenlets在飞行中产生并被杀死):
# this thing always called in a green thread; forget about pools and queues.
def on_message(msg):
# you're in a green thread now; just process away!
benefit_from("all the gevent goodness!")
spawn_and_join_10_sub_greenlets()
rabbitmq_channel.register_callback(lambda msg: gevent.spawn(on_message, msg))