0

我正在使用 Kombu 在 Redis 上构建一个生产者/消费者设置,但我遇到了一个问题。如果我启动一个消费者,然后使用 range(10000) 启动生产者,我可以确认生产者已将所有 10k 项排队,但并非所有 10k 项都被消费者接收。Kombu 或 Redis 是否有我不知道的限制?它似乎可以在 range(9000) 下正常工作,并且所有键/ack 都已正确耗尽。

class ProduceConsume(object):
    def __init__(self, exchange_name):
        exchange = Exchange(exchange_name, type='fanout', durable=False)
        self.queue_name = 'test_queue'
        self.queue = Queue(self.queue_name, exchange)

    def producer(self, inp):
        with BrokerConnection("redis://localhost:6379/15") as conn:
            with conn.SimpleQueue(self.queue) as queue:
                for payload in inp:
                    queue.put(str(payload).zfill(5))
                    print(str(payload).zfill(5))

    def consumer(self):
        with BrokerConnection("redis://localhost:6379/15") as conn:
            with conn.SimpleQueue(self.queue) as queue:
                while True:
                    message = queue.get()
                    message.ack()
                    print(message.payload)
4

1 回答 1

0

不是一个完整的答案,但使用 RabbitMQ 而不是 Redis 没有消息丢失问题。可能是由于非直接交换的问题?随时提交更明智的答案,但希望这可以帮助某人。

更新:这是 Kombu 中的一个错误;见https://github.com/celery/kombu/issues/593

于 2016-05-25T18:10:19.100 回答