这可能是关于 python 回调和使用 pika 一样多的问题。我正在尝试开发一些代码来订阅 RabbitMQ 中的队列,处理任何已传递消息的有效负载,然后将该有效负载写入一系列(磁盘)文件。因此,使用http://www.rabbitmq.com/tutorials/tutorial-one-python.html上的简单“Hello World”示例,我在回调函数中添加了逻辑(顺便称为“回调” ) 将任何接收到的消息有效负载写入文件。
这是主要问题:我想编写一些额外的代码,如果某个时间段已经过去,例如 300 秒(5 分钟),那么进程应该关闭文件并创建一个新文件,并将收到的任何后续新消息写入那。等等 ...
但是 - 我看到的问题是只有当消息到达队列时才会调用回调函数。我想我需要一些测量经过时间的回调函数之外的过程......
基本原理是我想创建一组磁盘文件(所有文件都具有基于时间戳的唯一名称),其中包含 MQ 队列中接收到的消息。如果消息来得很慢,那么我关闭当前打开的文件(以便可以在下游进一步处理)并打开另一个。
我还注意到,在发出开始消费调用 (channel.start_sumption) 之后,没有达到下面的代码 - 为什么?
我玩过python的多处理模块,但到目前为止还没有运气。
这是一些带有伪代码注释的骨架代码:-
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
# want to put code here to write message payloads to a file (unique name)
# if n secs have elapsed then close the file and create a new file
channel.basic_consume(callback,queue='hello',no_ack=True)
channel.start_consuming()
谢谢 !