6

我正在测试kombu是如何工作的。我计划在几个项目中更换鼠兔。我看到kombu 有很多文档,但是使用我在文档中找到的一些消息会丢失。这是代码:

from kombu import Connection, Producer
conn = Connection('amqp://localhost:5672')
def errback(exc, interval):
     logger.error('Error: %r', exc, exc_info=1)
     logger.info('Retry in %s seconds.', interval)
producer = Producer(conn)
publish = conn.ensure(producer, producer.publish, errback=errback, max_retries=3)
for i in range(1, 200000):
   publish({'hello': 'world'}, routing_key='test_queue')
   time.sleep(0.001)

当它发布时,我关闭了几次连接并继续发布,但在队列中大约有 60000 条消息,所以有很多丢失的消息。

我尝试了不同的选择,例如:

publish({'hello': 'world'}, retry=True, mandatory=True, routing_key='hipri')

谢谢!

4

1 回答 1

11

问题是默认情况下 Kombu 不使用“确认”,您必须使用:

        conn = Connection('amqp://localhost:5672', transport_options={'confirm_publish': True})

谢谢

于 2014-01-09T09:40:55.030 回答