4

我有一个名为 的 RabbitMQ 主题交换experiment。我正在构建一个消费者,我想在其中接收路由键以“foo”开头的所有消息以及路由键以“bar”开头的所有消息。

根据 RabbitMQ 文档,并根据我自己在管理 UI 中的实验,应该可以有一个交换、一个队列和两个连接它们的绑定(foo.#和)。bar.#

我不知道如何使用 Kombu 的 ConsumerMixin 来表达这一点。我觉得我应该能够做到:

q = Queue(exchange=exchange, routing_key=['foo.#', 'bar.#'])

...但它根本不喜欢那样。我也试过:

q.bind_to(exchange=exchange, routing_key='foo.#')
q.bind_to(exchange=exchange, routing_key='bar.#')

...但每次我尝试我都会得到:

kombu.exceptions.NotBoundError: Can't call method on Queue not bound to a channel

......我猜鬃毛是有感觉的。但是,我在 mixin 的界面中看不到某个位置,一旦它们绑定到通道,我就可以轻松地挂接到队列上。这是基本(工作)代码:

from kombu import Connection, Exchange, Queue
from kombu.mixins import ConsumerMixin


class Worker(ConsumerMixin):
    exchange = Exchange('experiment', type='topic')
    q = Queue(exchange=exchange, routing_key='foo.#', exclusive=True)

    def __init__(self, connection):
        self.connection = connection

    def get_consumers(self, Consumer, channel):
        return [Consumer(queues=[self.q], callbacks=[self.on_task])]

    def on_task(self, body, message):
        print body
        message.ack()


if __name__ == '__main__':
    with Connection('amqp://guest:guest@localhost:5672//') as conn:
        worker = Worker(conn)
        worker.run()

...这有效,但只给我foo消息。除了为我感兴趣的每个路由键创建一个新队列并将它们全部传递给消费者之外,有没有一种干净的方法可以做到这一点?

4

1 回答 1

9

经过一番挖掘,我找到了一种与我最初的想法相当接近的方法来实现这一点。与其将routing_key字符串传递给队列,不如传递一个bindings列表。列表中的每个元素都是binding指定交换和路由键的对象的实例。

一个例子值一千字:

from kombu import Exchange, Queue, binding

exchange = Exchange('experiment', type='topic')
q = Queue(exchange=exchange, bindings=[
    binding(exchange, routing_key='foo.#'),
    binding(exchange, routing_key='bar.#')
], exclusive=True)

而且效果很好!

于 2016-05-13T16:14:47.207 回答