2

我想确保我的消息已发送到队列。

为此,我将强制参数添加到 basic_publish。basic.return如果我的消息没有成功发送,我还应该怎么做才能收到消息?

我不能channel.wait()用来收听,basic.return因为当我的消息成功传递时,该wait()功能将永远挂起。(没有超时)另一方面。当我不打电话时,即使消息没有送达,遗嘱也会保持空白channel.wait()channel.returned_messages

我使用py-amqplib的是 0.6 版。

欢迎任何解决方案。

4

3 回答 3

1

目前是不可能的,因为basic.return当消息在代理中被丢弃时是异步发送的。成功发送消息后,服务器不会报告任何数据。所以 pyAMQP 不能监听这样的消息。

我读过一些关于这个问题的帖子。可能的解决方案是:

  • 使用 txAMQP,处理 basic.return 的 amqp 的扭曲版本
  • 使用 pyAMQP 并等待超时。(我不确定目前是否可行)
  • 使用同步命令频繁 ping 服务器,以便 pyAMQP 能够在basic.return消息到达时选择消息。

因为对 pyAMQP 和 rabbitMQ 的支持水平普遍很低,我们决定根本不使用 amqp 代理。

于 2010-02-10T22:30:26.483 回答
1

您不能同步执行此操作,因为它是一个异步系统。但是你可以使用线程来解决这个问题。

基本思想是,您启动一​​个在通道上等待的线程,每当它退出等待时,它都会为返回的消息队列中的任何返回消息调用 call_back 函数。然后,您可以在 call_back 函数中处理该消息

def registerCallback(channel, call_back):
    """ This method sets up a thread which deals with the asynchronous callback for a message which could not be routed by the exchange.
    """
    def wait():
        try:
            channel.wait()
        except Exception, e:
            print("Problem waiting on publish channel: %s" % str(e))

        while not channel.returned_messages.empty():
            returnedMessage = channel.returned_messages.get()
            processReturnedMessageThread = Thread(target=call_back, args=(returnedMessage))
            processReturnedMessageThread.start()

        wait()

    waiting = Thread(target=wait) 
    waiting.start()
于 2011-10-13T09:02:55.830 回答
1

您是否尝试过唯一完整的 Python AMQP 库?它没有被广泛使用,因为它没有整齐地包装。

步骤 1. 编译 C 库 - 您可能需要sudo apt-get install autotools-dev autoconf automake libtool

mkdir rabbitc
cd rabbitc
hg clone http://hg.rabbitmq.com/rabbitmq-codegen/
hg clone http://hg.rabbitmq.com/rabbitmq-c/
cd rabbitmq-c
autoreconf -i
make clean
./configure --prefix=/usr
make
sudo make install

步骤 2. 安装 Python 库

pip install pylibrabbitmq
于 2011-07-05T02:20:03.737 回答