2

我是使用 RabbitMQ 和 Pika 的新手,所以如果答案很明显,请原谅......

我们正在输入一些数据并将结果传递到我们的 rabbitmq 消息队列中。该队列正在被将数据写入弹性搜索的进程使用。

数据的生成速度比它可以输入弹性搜索的速度要快,因此队列会增长并且几乎不会缩小。

我们正在使用 pika 并收到警告:

UserWarning: Pika: Write buffer exceeded warning threshold at X bytes and an estimated X frames behind.

这会持续一段时间,直到 Pika 简单地崩溃并出现一条奇怪的错误消息:

NameError: global name 'log' is not defined

我们正在使用 Pika BlockingConnection 对象 (http://pika.github.com/connecting.html#blockingconnection)。

我解决此问题的计划是使用 add_backpressure_callback 函数来创建一个函数,该函数将在time.sleep(0.5)每次我们需要应用背压时调用。但是,这似乎是一个过于简单的解决方案,并且必须有一种更合适的方式来处理这样的事情。

我猜想队列的填充速度比消耗的速度快是一种常见的情况。我正在寻找一个例子,甚至是一些关于什么是减缓队列速度的最佳方法的建议。

谢谢!

4

1 回答 1

1

有趣的问题,正如您正确指出的那样,这可能很常见。我在 Stack Overflow 上看到了另一个相关问题,其中包含一些提示

Pika:写缓冲区超出警告

此外,您可能想考虑扩大您的弹性搜索,这可能是您想要解决的根本瓶颈。在 elasticsearch.org 网站上快速浏览一下

“分散式

Elastic Search 的主要特点之一是其分布式特性。索引被分解成碎片,每个碎片都有 0 个或更多副本。集群中的每个数据节点都托管一个或多个分片,并充当协调者将操作委托给正确的分片。重新平衡和路由是在幕后自动完成的。"

(......虽然不确定插入是否也是分布式和可扩展的)

毕竟,RabbitMQ 不应该无限增长队列。还可能希望查看扩展 RabbitMQ 本身,例如通过在 RabbitMQ 配置中使用每个队列进程等。

干杯!

于 2012-08-17T08:43:34.070 回答