我从 RabbitMQ 写了一个简单的队列消费者(使用kombu):
import socket
from kombu import Connection, Exchange, Queue
def handle_message(payload, message):
payload = int(message.payload)
index = payload % 1000
print('queue%s: ' %(index+1) + message.payload)
message.ack()
while True:
try:
# connections
with Connection('amqp://1000:1000@localhost:5672//1000') as conn:
while True:
chan = conn.channel()
chan.basic_qos(prefetch_size=0, prefetch_count=10, a_global=False)
with conn.Consumer(channel=chan, callbacks=[handle_message]) as consumer:
queue = Queue('queue1', routing_key='queue1', exchange=Exchange())
consumer.add_queue(queue)
consumer.consume()
try:
print('before drain_events')
conn.drain_events(timeout=5)
print('after drain_events')
consumer.cancel_by_queue('queue1')
if not consumer.consuming_from('queue1'):
consumer.recover(requeue=True)
except socket.timeout as e:
print('timeout')
except Exception as e:
print("An exception was caught %s", e.message);
在这段代码中,我向消费者添加了一个队列(“queue1”)并开始从中消费,并在每个 drain_event 之后使用 cancel_by_queue 删除队列。
我得到的结果是,在第一轮 drain_event 回调“handle_message”被调用,但在下一次迭代中没有。我用错了吗??
输出:
before drain_events
queue1: 6221
after drain_events
before drain_events
after drain_events
before drain_events
after drain_events
before drain_events
after drain_events
before drain_events
.
.
.
如果我把
with Connection('amqp://1000:1000@localhost:5672//1000') as conn:
在内部循环内部它可以工作 - 但我不想每次删除队列并再次开始消费时都创建一个新连接。
谢谢
亚尼夫