问题标签 [py-amqplib]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
3130 浏览

python - 如何使用 py-amqplib 在多个队列上等待消息

我正在使用py-amqplib在 Python 中访问 RabbitMQ。应用程序会不时收到监听某些 MQ 主题的请求。

第一次收到这样的请求时,它会创建一个 AMQP 连接和一个通道,并启动一个新线程来监听消息:

AMQPListener非常简单:

创建连接后,它订阅感兴趣的主题,如下所示:

第一次这一切都很好。但是,它在后续请求订阅另一个主题时失败。在后续请求中,我重新使用 AMQP 连接和 AMQPListener 线程(因为我不想为每个主题启动一个新线程)并且当我调用上面的代码块时,channel.queue_declare()方法调用永远不会返回。我还尝试在那时创建一个新频道,并且connection.channel()调用也永远不会返回。

我能够让它工作的唯一方法是为每个主题(即routing_key)创建一个新的连接、通道和侦听器线程,但这真的不理想。我怀疑是 wait() 方法以某种方式阻塞了整个连接,但我不确定该怎么做。当然,我应该能够使用单个侦听器线程接收具有多个路由键(甚至在多个通道上)的消息?

一个相关的问题是:当该主题不再感兴趣时,我如何停止侦听器线程?如果没有消息,channel.wait() 调用似乎会永远阻塞我能想到的唯一方法是向队列发送一条“毒化”它的虚拟消息,即。被听者解释为停止的信号。

0 投票
2 回答
3335 浏览

python - 多个消费者和生产者连接到一个消息队列,这在 AMQP 中可能吗?

我想创建一个能够 OCR 文本的进程场。我考虑过使用由多个 OCR 进程读取的单个消息队列。

我想确保:

  • 队列中的每条消息最终都会被处理
  • 工作或多或少平均分配
  • 图像将仅由一个 OCR 进程解析
  • OCR 进程不会一次收到多条消息(因此任何其他免费的 OCR 进程都可以处理该消息)。

使用AMQP可以做到吗?

我打算使用python和rabbitmq

0 投票
3 回答
1163 浏览

python - 如何在 AMQP 的 python 客户端中使用监听 basic.return

我想确保我的消息已发送到队列。

为此,我将强制参数添加到 basic_publish。basic.return如果我的消息没有成功发送,我还应该怎么做才能收到消息?

我不能channel.wait()用来收听,basic.return因为当我的消息成功传递时,该wait()功能将永远挂起。(没有超时)另一方面。当我不打电话时,即使消息没有送达,遗嘱也会保持空白channel.wait()channel.returned_messages

我使用py-amqplib的是 0.6 版。

欢迎任何解决方案。

0 投票
1 回答
454 浏览

python - 在两个 RabbitMQ 队列中发布消息,而不是一个(使用 py-amqp)

我使用py-amqpFlopsy模块遇到了这个奇怪的问题。我编写了一个将消息发送到 RabbitMQ 服务器的发布者,我希望能够将其发送到指定的队列。在 Flopsy 模块上这是不可能的,所以我对其进行了调整,添加了一个参数和一行来在 Publisher 对象的 _init__ 方法上声明队列

通道对象是 py-amqplib 库的一部分

我遇到的问题是,即使它将消息发送到指定的队列,它也会将消息发送到默认队列。因为在这个系统中,我们希望发送相当多的消息,我们不想强调系统制造无用的重复......我试图调试代码并进入 py-amqplib 库,但我无法找出任何错误或缺少步骤。此外,我无法在代码之外找到任何文档形式的 py-amqplib。

关于为什么会发生这种情况以及如何纠正它的任何想法?

0 投票
5 回答
14802 浏览

.net - 等待一条带有超时的 RabbitMQ 消息

我想向 RabbitMQ 服务器发送一条消息,然后等待回复消息(在“回复”队列上)。当然,我不想永远等待,以防处理这些消息的应用程序出现故障 - 需要超时。这听起来像是一项非常基本的任务,但我找不到这样做的方法。我现在在py-amqplibRabbitMQ .NET 客户端都遇到了这个问题。

到目前为止我得到的最好的解决方案是使用中间进行轮询basic_getsleep但这很丑陋:

当然有更好的方法吗?

0 投票
1 回答
263 浏览

python - py-amqp/flopsy:在 Python 中等待单个 AMQP 消息

我有一个与此类似的问题在 py-amqp/flopsy 中很容易说“我将永远等待,并且我希望在收到消息时调用此回调”,但我找不到任何方式说“好的,我收到了我想要的消息,现在停止等待。” (也许是 GOTO?开个玩笑......)有没有一种优雅的方式来做到这一点?

0 投票
3 回答
10999 浏览

rabbitmq - 如何从 RabbitMQ 中删除队列绑定?

我正在使用 RabbitMQ 按主题将消息路由到感兴趣的订阅者。每个订阅者都有一个队列,我将队列绑定到他们感兴趣的主题。我想允许用户从他们的主题列表中删除一个项目。

在我的设置中,这将需要从该用户的队列中“解除绑定”绑定主题。

我正在使用 pyamqplib,但我没有看到通过通道对象执行此操作的方法。他们是一种从队列中删除先前绑定的路由键的方法吗?

0 投票
1 回答
1762 浏览

transactions - Kombu 和 Rabbitmq 的 AMQP 交易

我似乎找不到任何关于通过Kombu api使用AMQP 事务的文档。此页面讨论了将消息附加到事务状态,但似乎并不相关。

我知道 pika 后端支持它们,我很确定 amqplib 后端(我目前正在使用)也支持,但我还没有看到它是如何在 Kombu 中公开的。

编辑:澄清一下,我正在寻找 channel.commit()、channel.select()、... 类型方法

0 投票
2 回答
1579 浏览

python - How to detect an exchange does not exist with py-amqp

I want to be able to detect whether an exchange does not exist when submitting a message to AMQP.

Consider following example.

This script keeps publishing to the exchange but does not raise any errors if the exchange does not exist. When the exchange exists, the messages arrive.

When I add outgoing.wait() a amqp.exceptions.NotFound is raised which is what I want. The problem is however that if in this case the exchange exists, the message arrives but outgoing.wait() blocks my loop. (I could run outgoing.wait() in a separate thread but that I do not want to do.)

How to deal with this?

Any advice tips pointers welcome

Thanks,

Jay

0 投票
3 回答
41364 浏览

python - 获取 RabbitMQ 队列中的消息数

我们正在使用amqplib发布/使用消息。我希望能够读取队列上的消息数量(理想情况下,既已确认又未确认)。这将允许我向管理员用户显示一个漂亮的状态图,并检测某个组件是否跟不上负载。

我在 amqplib 文档中找不到有关读取队列状态的任何信息。

有人可以指出我正确的方向吗?