3

我从 RabbitMQ 写了一个简单的队列消费者(使用kombu):

import socket
from kombu import Connection, Exchange, Queue

def handle_message(payload, message):
   payload = int(message.payload)
   index = payload % 1000
   print('queue%s: ' %(index+1) + message.payload)
   message.ack()

while True:
    try:
        # connections
        with Connection('amqp://1000:1000@localhost:5672//1000') as conn:

            while True:
                chan = conn.channel()
                chan.basic_qos(prefetch_size=0, prefetch_count=10, a_global=False)

                with conn.Consumer(channel=chan,  callbacks=[handle_message]) as consumer:
                    queue = Queue('queue1', routing_key='queue1', exchange=Exchange())
                    consumer.add_queue(queue)
                    consumer.consume()

                    try:
                        print('before drain_events')
                        conn.drain_events(timeout=5)
                        print('after drain_events')

                        consumer.cancel_by_queue('queue1')
                        if not consumer.consuming_from('queue1'):
                            consumer.recover(requeue=True)
                    except socket.timeout as e:
                        print('timeout')

    except Exception as e:
        print("An exception was caught %s", e.message);

在这段代码中,我向消费者添加了一个队列(“queue1”)并开始从中消费,并在每个 drain_event 之后使用 cancel_by_queue 删除队列。

我得到的结果是,在第一轮 drain_event 回调“handle_message”被调用,但在下一次迭代中没有。我用错了吗??

输出:

before drain_events
queue1: 6221
after drain_events
before drain_events
after drain_events
before drain_events
after drain_events
before drain_events
after drain_events
before drain_events
.
.
.

如果我把

with Connection('amqp://1000:1000@localhost:5672//1000') as conn:

在内部循环内部它可以工作 - 但我不想每次删除队列并再次开始消费时都创建一个新连接。

谢谢

亚尼夫

4

0 回答 0