4

使用 Kombu 和 RabbitMQ 实现经典的发布/订阅设计模式。我创建了一个创建主题的生产者:

from kombu import Connection, Exchange, Queue

media_exchange = Exchange('media', 'topic', durable=False)
video_queue = Queue('video', exchange=media_exchange, routing_key='video')

with Connection('amqp://guest:guest@localhost//') as conn:
    producer = conn.Producer(serializer='json')
    producer.publish('Hello World!',
                      exchange=media_exchange, routing_key='video',
                      declare=[video_queue])

然后我创建了一个消费者从发布者那里消费:

from kombu import Connection, Exchange, Queue

media_exchange = Exchange('media', type='topic', durable=False)
video_queue = Queue('video', exchange=media_exchange, routing_key='video')

def process_media(body, message):
    print(body)
    #message.ack()

with Connection('amqp://guest:guest@localhost//') as conn:
    with conn.Consumer(video_queue, callbacks=[process_media]) as consumer:
        # Process messages and handle events on all channels
        while True:
            conn.drain_events()

然后启动两个消费者,每个消费者在一个单独的终端中;两者都等待消息:

terminal 1: python consumer.py
terminal 2: python consumer.py

当我运行生产者时,只有一个消费者收到消息。

4

2 回答 2

6

生产者在交换中发布,而不是在队列中。队列由消费者定义。当为每个消费者使用不同的队列名称时,所有人都会收到消息。当为同一个队列使用多个消费者时,它就是负载平衡,这就是为什么只有一个消费者收到消息的原因。

于 2014-08-15T21:29:49.433 回答
3

澄清一下,队列中的消息被“消费”了,即第一个消费者消费它,消息不再在队列中,这就是为什么第二个消费者什么都没有得到。

为同一消息拥有 2 个单独的消费者 - 使用 2 个单独的队列,即 video_queue1video_queue2,声明并绑定到交换器media_exchange,使用相同的密钥video

生产者.py

from kombu import Connection, Exchange, Queue

media_exchange = Exchange('media', 'topic', durable=False)
video_queue1 = Queue('video1', exchange=media_exchange, routing_key='video')
video_queue2 = Queue('video2', exchange=media_exchange, routing_key='video')


with Connection('amqp://guest:guest@localhost//') as conn:
    producer = conn.Producer(serializer='json')
    producer.publish('Hello World!',
                      exchange=media_exchange, routing_key='video',
                      declare=[video_queue1, video_queue2])

消费者1.py

from kombu import Connection, Exchange, Queue

media_exchange = Exchange('media', type='topic', durable=False)
video_queue = Queue('video1', exchange=media_exchange, routing_key='video')

def process_media(body, message):
    print(body)
    #message.ack()

with Connection('amqp://guest:guest@localhost//') as conn:
    with conn.Consumer(video_queue, callbacks=[process_media]) as consumer:
        # Process messages and handle events on all channels
        while True:
            conn.drain_events()

消费者2.py

from kombu import Connection, Exchange, Queue

media_exchange = Exchange('media', type='topic', durable=False)
video_queue = Queue('video2', exchange=media_exchange, routing_key='video')

def process_media(body, message):
    print(body)
    #message.ack()

with Connection('amqp://guest:guest@localhost//') as conn:
    with conn.Consumer(video_queue, callbacks=[process_media]) as consumer:
        # Process messages and handle events on all channels
        while True:
            conn.drain_events()
于 2016-05-17T07:25:02.297 回答