我是使用 RabbitMQ 和 Pika 的新手,所以如果答案很明显,请原谅......
我们正在输入一些数据并将结果传递到我们的 rabbitmq 消息队列中。该队列正在被将数据写入弹性搜索的进程使用。
数据的生成速度比它可以输入弹性搜索的速度要快,因此队列会增长并且几乎不会缩小。
我们正在使用 pika 并收到警告:
UserWarning: Pika: Write buffer exceeded warning threshold at X bytes and an estimated X frames behind.
这会持续一段时间,直到 Pika 简单地崩溃并出现一条奇怪的错误消息:
NameError: global name 'log' is not defined
我们正在使用 Pika BlockingConnection 对象 (http://pika.github.com/connecting.html#blockingconnection)。
我解决此问题的计划是使用 add_backpressure_callback 函数来创建一个函数,该函数将在time.sleep(0.5)
每次我们需要应用背压时调用。但是,这似乎是一个过于简单的解决方案,并且必须有一种更合适的方式来处理这样的事情。
我猜想队列的填充速度比消耗的速度快是一种常见的情况。我正在寻找一个例子,甚至是一些关于什么是减缓队列速度的最佳方法的建议。
谢谢!