问题标签 [pika]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
2935 浏览

python - 如何通过共享事件循环将 Tornado 和 Pika 与 RabbitMQ 连接起来?

我在 Pika 中使用 TornadoConnection 适配器,但是发现文档很轻。我想要:

  1. Tornado 通过处理程序启动 Pika 生产者
  2. Pika 执行消费者
  3. 消费完成时通过回调通知 Tornado 并更新 Web 客户端

文档展示了如何使用 Tornado IOLoop,但我还没有看到这样的例子。非常感激。


0 投票
2 回答
2169 浏览

python - Pika 和 RabbitMQ - 在特定节点上创建队列

我有一个带有两个节点的 rabbitmq 集群。我想创建一些要托管在 node1 上的队列,以及托管在 node2 上的其他队列。

即使我将 ConnectionParameters 中的主机设置为 node2,队列仍将最终在 node1 上创建。

以编程方式,我不确定如何使用 pika 指定要在其上创建队列的节点。中没有这样的参数queue_declare,并且像这样传递参数似乎不起作用:

有没有指定托管节点的接口?有没有其他方法可以处理这种情况?

谢谢!

0 投票
7 回答
30335 浏览

rabbitmq - 在 pika / RabbitMQ 中处理长时间运行的任务

我们正在尝试建立一个基本的定向队列系统,其中生产者将生成多个任务,而一个或多个消费者将一次获取一个任务,处理它并确认消息。

问题是,处理可能需要 10-20 分钟,而我们当时没有响应消息,导致服务器断开我们的连接。

这是我们消费者的一些伪代码:

第一个任务完成后,在 BlockingConnection 深处的某个地方抛出异常,抱怨套接字被重置。此外,RabbitMQ 日志显示消费者因未及时响应而断开连接(为什么它重置连接而不是发送 FIN 很奇怪,但我们不会担心)。

我们搜索了很多,因为我们认为这是 RabbitMQ 的正常用例(有很多长时间运行的任务,应该在许多消费者之间分配),但似乎没有其他人真正遇到过这个问题。最后,我们偶然发现了一个线程,建议使用心跳并long_running_task()在单独的线程中生成心跳。

于是代码变成了:

这似乎有效,但它非常混乱。我们确定ch对象是线程安全的吗?此外,假设long_running_task()使用该连接参数将任务添加到新队列(即,这个漫长过程的第一部分已经完成,让我们将任务发送到第二部分)。因此,线程正在使用该connection对象。那个线程安全吗?

更重要的是,这样做的首选方式是什么?我觉得这很混乱,可能不是线程安全的,所以也许我们做得不对。谢谢!

0 投票
1 回答
3183 浏览

python - Amazon ec2 中 pika-rabbitmq 的良好心跳间隔

我正在为 rabbitmq 使用最新的 pika 库(0.9.9+)。我对 rabbitmq 和 pika 的用法如下:

  1. 作为工人,我有长时间运行的任务(大约 5 分钟)。这些任务从rabbitmq 获取它们的请求。请求很少出现,即请求之间有很长的空闲时间。
  2. 我之前面临的问题与空闲连接(由于空闲连接导致的连接关闭)有关。所以,我在 pika 中启用了心跳。
  3. 现在心跳的选择是个问题。Pika 似乎是一个单线程库,其中心跳接收和确认恰好在请求时间范围之间完成。
  4. 因此,如果设置的心跳间隔小于回调函数用于执行其长时间运行计算的时间,则服务器不会收到任何心跳确认并关闭连接。
  5. 所以,我假设最小心跳间隔应该是阻塞连接中回调函数的最大计算时间。

对于亚马逊 ec2 来说,什么是好的心跳值可以防止它关闭空闲连接?

此外,有些人建议使用 rabbitmq keepalive(或 libkeepalive)来维护 tcp 连接。我认为在 tcp 层管理心跳要好得多,因为应用程序不需要管理它们。这是真的吗?与 RMQ 心跳相比,keepalive 是一个好方法吗?

我看到有些人建议使用多个线程和队列来处理长时间运行的任务。但这是长时间运行任务的唯一选择吗?在这种情况下必须使用另一个队列是非常令人失望的。

先感谢您。我想我已经详细说明了这个问题。让我知道我是否可以提供更多详细信息。

0 投票
3 回答
3170 浏览

python - 使用 Pika 客户端轮询 RabbitMQ 消息

我想在 Python 中创建一个 RabbitMQ 接收器/消费者,但不知道如何检查消息。我正在尝试在自己的循环中执行此操作,而不是使用 pika 中的回调。

如果我理解了一些事情,我可以在 Java 客户端getBasic()中检查是否有任何消息可用而不会阻塞。我不介意在收到消息时阻止,但我不想在有消息之前阻止。

我没有找到任何明确的例子,也没有弄清楚 pika 中的相应调用。

0 投票
1 回答
2418 浏览

python - Pika/RabbitMQ 连接问题 - 运行 VMWare CentOS 6.3

我刚刚设置了 CentOS 6.3 的全新 VMWare 安装。互联网正在运行,一切似乎都可以正常运行。

我正在尝试使用 RabbitMQ,但我被困在他们教程的第 1 步:

http://www.rabbitmq.com/tutorials/tutorial-one-python.html

基本上,我:

  1. 设置 Linux 实例
  2. 安装了 RabbitMQ 的所有依赖项,例如 erlang/esel
  3. 尝试他们的 Hello World 教程

它实际上在这一行失败了:

我收到此错误:

我正在尝试我所有的故障排除尝试,我有点期待其他人也有同样的问题并发布了它。哦,好吧,我猜我是第一个!

无论如何,在这一点上,我认为我没有接触过 RabbitMQ 库,所以这可能只是一个 Pika 问题。

以下是我从 Wireshark 看到的,专注于 127.0.0.1:

我可以从wireshark提供更多信息,请告诉我

0 投票
2 回答
7401 浏览

java - 如何停止使用 Java 和 Python 客户端在 RabbitMQ 中消费?

我有我使用的 Java 和 Python 客户端channel.basicConsume()。在某些时候,我想在不停止整个程序的情况下停止这些消费者。

在带有 Pika 的 Python 中,我已经进行了channel.stop_consuming()调用,但是那些会产生我忽略的错误。似乎工作

在 Java 中,我不确定如何执行此操作,因为 stop_consume() 似乎不可用。

我看到的所有文档都讨论了创建消费者的所有方法,但我似乎找不到任何显示如何阻止它们的东西。

解决此问题的最佳方法是什么?

0 投票
1 回答
1146 浏览

python - 鼠兔没有属性日志

我正在尝试运行一些使用 pika 和龙卷风的 python 代码。我都安装了,但得到错误

两个包都安装在/usr/local/lib/python2.7/dist-packages/

这是我第一次玩python,所以我真的不知道为什么它找不到它。代码中的一切似乎都是正确的:

0 投票
1 回答
2091 浏览

python - RabbitMQ - pika - rabbit.js - node.js

我正在使用 Python、rabbitmq、pika、rabbit.js 和 node.js。

这个想法是向客户发送消息并采取相应的行动。有多种类型的消息正在发送。

所以,在服务器端,我有一个方法可以接收消息和交换将消息发送到:

仍然在服务器端,使用 node.js,我有多个上下文,我相信交换会根据名称调用它们:(注意:“客户端”是指连接到服务器的客户端列表。)

这里发生的是 consumer_1 正在向所有客户端发送消息,而 consumer_2 正在向特定客户端发送消息,并且该部分运行良好。

问题是所有消费者都在接收来自所有交易所的消息。如果我尝试向 consumer_1 发送消息,consumer_2 也会收到该消息。

在实际方面,如果我打电话:

结果将是:

  • 节点将发射到“client_method_1”;
  • 并且还将尝试发送到“client_method_2”。

我如何打电话给特定的消费者而不打电话给其他人?

或者,我应该只声明一个消费者并过滤数据以调用不同的方法吗?

编辑:

按照 TokenMacGuy 的建议,我尝试将交换类型更改为“主题”或“直接”。那应该正是我想要的。

但是总有一个但是,不知何故我无法重新声明交换。当我重新启动节点时,我不断收到此错误:

好的,所以我检查了 rabbitmq 上的 list_exchanged 并且消费者仍然使用“ fanout ”。

下一步是通过停止、重置(然后强制重置)和启动来重置 rabbitmq。

我已向 consumer_1 发送消息

现在 consumer_1 被列为“直接”:

好的,因为我只调用了 consumer_1 并且所有其他都出现在列表中,所以必须有一个包含该信息的文件。在哪里?我不知道!!

无论如何,consumer_1 上会发生相同的 PRECONDITION_FAILED 错误。

有任何想法吗?

0 投票
1 回答
499 浏览

python - 启用持久传递模式后,强制工作人员立即接收失败的消息

我设置了一个RabbitMQ 服务器,在其中使用Python-Pika获取消息。问题是,如果我启用了持久传递模式,并且工作人员无法处理消息。它不会释放消息,而是会一直保留到消息,直到 RabbitMQ 连接被重置。

有没有办法确保无法处理的消息在合理的时间范围内从可用的工作人员(包括同一个工作人员)再次被拾起?

这是我当前的代码

这个想法是我永远不想丢失一条消息,而是希望它被重新发布,或者如果它失败了,我希望它再次被拾起。在工作人员成功处理消息之前,应该始终如此。这通常发生在其中一名工作人员无法打开与 HTTP 服务器的连接时。