2

我使用 pika python 库从 RabbitMQ 读取消息。在循环中读取消息由

connection = rpc.connect()
channel = connection.channel()
channel.basic_consume(rpc.consumeCallback, queue=FromQueue, no_ack=Ack)
channel.start_consuming()

这工作正常。但我也需要阅读一条消息,我这样做:

method, properties, body = channel.basic_get(queue=FromQueue)
rpc.consumeCallback(Channel=channel,Method=method, Properties=properties,Body=body)

但是,当队列中没有消息时,脚本就会出现问题。如何实现此处描述的 get_empty() 方法?

4

3 回答 3

1

我通过检查响应暂时解决了它,例如:

method, properties, body = channel.basic_get(queue=FromQueue)
if(method == None):
    ## queue is empty
于 2018-10-08T07:27:41.557 回答
0

您可以像这样检查正文中的空:

def callback(ch, method, properties, body):
    decodeBodyInfo = body.decode('utf-8')
    if decodeBodyInfo != '':
        cacheResult = decodeBodyInfo
        ch.stop_consuming()

如此简单易用:D

于 2019-05-09T04:12:31.550 回答
0

如果您channel.consume在 for 循环中使用生成器,则可以设置inactivity_timeout参数。

从 pika 文档中,

:param float inactivity_timeout: if a number is given (in seconds), will cause the
method to yield (None, None, None) after the given period of inactivity; this 
permits for pseudo-regular maintenance activities to be carried out by the user 
while waiting for messages to arrive. If None is given (default), then the method 
blocks until the next event arrives. NOTE that timing granularity is limited by the 
timer resolution of the underlying implementation.NEW in pika 0.10.0.

所以把你的代码改成这样可能会有所帮助

        for method_frame, properties, body in channel.consume(queue, inactivity_timeout=120):

            # break of the loop after 2 min of inactivity (no new item fetched)
            if method_frame is None
                break

退出循环后不要忘记正确处理通道和连接

于 2021-01-06T11:29:56.707 回答