0

这可能是关于 python 回调和使用 pika 一样多的问题。我正在尝试开发一些代码来订阅 RabbitMQ 中的队列,处理任何已传递消息的有效负载,然后将该有效负载写入一系列(磁盘)文件。因此,使用http://www.rabbitmq.com/tutorials/tutorial-one-python.html上的简单“Hello World”示例,我在回调函数中添加了逻辑(顺便称为“回调” ) 将任何接收到的消息有效负载写入文件。

这是主要问题:我想编写一些额外的代码,如果某个时间段已经过去,例如 300 秒(5 分钟),那么进程应该关闭文件并创建一个新文件,并将收到的任何后续新消息写入那。等等 ...

但是 - 我看到的问题是只有当消息到达队列时才会调用回调函数。我想我需要一些测量经过时间的回调函数之外的过程......

基本原理是我想创建一组磁盘文件(所有文件都具有基于时间戳的唯一名称),其中包含 MQ 队列中接收到的消息。如果消息来得很慢,那么我关闭当前打开的文件(以便可以在下游进一步处理)并打开另一个。

我还注意到,在发出开始消费调用 (channel.start_sumption) 之后,没有达到下面的代码 - 为什么?

我玩过python的多处理模块,但到目前为止还没有运气。

这是一些带有伪代码注释的骨架代码:-

#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)

    # want to put code here to write message payloads to a file (unique name)
    # if n secs have elapsed then close the file and create a new file

channel.basic_consume(callback,queue='hello',no_ack=True)

channel.start_consuming()

谢谢 !

4

1 回答 1

0

可能值得看看Pika的替代实现。由于鼠兔天生会阻挡,因此很难创造出这样的东西。你基本上需要另一个线程来观察 IO,看看在过去五分钟内是否有任何东西被写入,否则关闭它。

你也可以保留一个时间戳,一旦你得到一个新的回调,如果足够的时间过去了,你可以关闭文件,并创建一个新文件。然而,这将使文件保持打开更长时间,但防止数据超过五分钟。

但是,我建议您改为查看Puka 。它是 Pika 的非阻塞替代方案,可让您更轻松地解决问题。

于 2013-06-22T13:24:13.960 回答