问题标签 [pika]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1927 浏览

python - 如何在长期工作的消费者中提供与 Pika 的并发性?

简短版:如何防止在远程过程调用情况下阻塞 Pika?

长版:

Pika 示例都没有演示我的用例。

我有一个 Tornado 服务器,它通过 AMQP(RabbitMQ、Pika)与其他进程/机器通信。这些其他进程的定义不是很明确,但它们在大多数情况下会返回数据(请参阅RabbitMQ 网站上的 RPC 示例)。有时,一个进程可能需要花费极长的时间来处理大量信息,但它不应该完全阻止较小的请求被该进程处理。或者,远程服务器可能因为发送了 Web 请求而阻塞。把它想象成一个 Web 服务器,但是使用 AMQP 而不是 HTTP。

由于 Pika 文档声称它不是线程安全的,因此我无法将连接传递给多个线程(或进程,就此而言)。我想要做的是启动一个新进程,并将一个套接字事件(用于该程序的管道)添加到 Pika IOLoop,就像我可以使用 Tornado 一样。Pika IOLoop 与 Tornado IOLoop 有很大不同,它似乎不支持添加多个处理程序;它似乎在一个套接字上使用一个“轮询器”进行操作。

我想避免这个包需要 Tornado 包,因为我只会使用 IOLoop。这不是不可能的,但我想看看我的其他选择是什么,或者是否可以通过以某种方式连接多个 Pika IOLoops/Pollers 来解决我的问题。RabbitMQ 的文档说,工人通常可以通过增加更多来“扩大规模”。我想避免为传入的每个请求创建连接(如果它们快速传入)。

0 投票
1 回答
2928 浏览

python - Pika basic_publish 在多个队列上发布时挂起

我需要在交换机上设置多个队列。我想创建一个连接,然后声明多个队列(这可行),然后在多个队列上发布消息(这不起作用)。

我设置了一些测试代码来执行此操作,但它每次都在第二次发布时挂断。我认为它不喜欢在不关闭连接的情况下在多个队列上发布,因为当我在单个队列上发布(甚至单个队列上的多条消息)时,此代码有效。

我需要添加一些东西来完成这项工作吗?我真的很想不必关闭发布之间的连接。此外,当我让我的消费者启动时,当我在多个队列上发送到 basic_publish() 时,他们看不到任何东西。当我在单个队列上发布时,我确实看到消息几乎立即出现。

0 投票
1 回答
1363 浏览

python - Pika:写缓冲区超出警告

我们的软件程序运行良好 5 个月,现在突然我们开始收到 Pika 警告,最终导致异常。

Pika 0.9.5 UserWarning:写入缓冲区超出警告阈值。

我搜索了很多论坛,但很少满意。描述的一种解决方案是完全忽略这些警告,但我对此有点怀疑。在这件事上的任何帮助将不胜感激。这是非常紧急的。

谢谢

0 投票
2 回答
2281 浏览

python - Pika 和 gevent 的随机超时错误

我一直在尝试通过使用 Pika 库(由 gevent 修补的猴子)在我的 gevent 程序中使用 RabbitMQ,gevent 喜欢随机抛出超时错误。

我该怎么办?我可以使用另一个库吗?

0 投票
3 回答
2271 浏览

python - 同步 AMQP 发布

我知道有许多库在 python 中实现 AMQP 支持。不过,我需要的是一个库,它允许我以同步方式进行 AMQP 发布,因为它将在 WSGI 应用程序中使用,因此与队列代理交互的通常异步回调驱动方式将是那里有点格格不入。

系统的其他部分使用pika来支持 AMQP,但它是异步的,即使那里存在某种“阻塞”连接,我也不想使用它。

当然,如果所有其他方法都失败了,则可以为每个 WSGI 进程维护一个 Pika 事件循环。另一个问题是我在当前稳定版本的 Pika 中发现了一些令人讨厌的 (IMO) 错误,我宁愿使用其他东西。

重申:

  • 我需要做 basic.publish (有“确认”支持!这样我就知道消息什么时候没有真正发布)
  • 以同步方式
  • 对于 rabbitmq(显然,“纯”AMQP 也可以工作)
  • 来自 python WSGI 应用程序
0 投票
1 回答
827 浏览

python - Python pika 库零星行为,丢弃消息

我在生产系统中使用 pika 并注意到 BlockingConnection 和 SelectConnection 都在这里和那里丢弃了一条消息,没有明显的错误或警告。不幸的是,我无法发布代码,但想向可能经历过这种行为的其他用户征求任何指导或轶事。听说 RabbitMQ 非常稳固,所以开始质疑 pika 的可靠性,想知道是否还有更值得信赖的替代品?

0 投票
2 回答
363 浏览

rabbitmq - 从 RabbitMQ 队列中弹出一个元素

我有一个队列,我已经从中塞满了一些 n 元素。

我想从中取出1个元素,然后退出回调。pika示例都使用了回调机制,这在应用程序结构中确实没有意义。

定义回调如下

不起作用,因为消息留在队列中

这样做的惯用语是什么?

0 投票
1 回答
681 浏览

rabbitmq - Pika/RabbitMQ:正确使用 add_backpressure_callback

我是使用 RabbitMQ 和 Pika 的新手,所以如果答案很明显,请原谅......

我们正在输入一些数据并将结果传递到我们的 rabbitmq 消息队列中。该队列正在被将数据写入弹性搜索的进程使用。

数据的生成速度比它可以输入弹性搜索的速度要快,因此队列会增长并且几乎不会缩小。

我们正在使用 pika 并收到警告:

这会持续一段时间,直到 Pika 简单地崩溃并出现一条奇怪的错误消息:

我们正在使用 Pika BlockingConnection 对象 (http://pika.github.com/connecting.html#blockingconnection)。

我解决此问题的计划是使用 add_backpressure_callback 函数来创建一个函数,该函数将在time.sleep(0.5)每次我们需要应用背压时调用。但是,这似乎是一个过于简单的解决方案,并且必须有一种更合适的方式来处理这样的事情。

我猜想队列的填充速度比消耗的速度快是一种常见的情况。我正在寻找一个例子,甚至是一些关于什么是减缓队列速度的最佳方法的建议。

谢谢!

0 投票
1 回答
169 浏览

rabbitmq - 如何使用 routing_key 和 queues

我正在设置一个消费者,它将监听来自两个不同来源的消息。我想对来自这两个来源的消息进行不同的回调(尽管欢迎其他解决方案)。

我对 rabbitmq 和 pika 很陌生,我还没有掌握细节。但我想知道的是:

我应该使用不同的队列并设置两个

对于我的回调或者我应该用路由键做一些技巧吗?

0 投票
1 回答
70 浏览

python - 尝试连接时如何提前超时

我有一个应用程序正在尝试连接到 rabbitmq-server,但是如果它无法连接到服务器,我希望我的应用程序在指定的秒数内超时。

我的问题是我不知道该怎么做。

澄清一下,当我的制片人尝试连接时,我想早点超时,因为现在最多需要 20-30 秒。