我正在尝试使用 python、asyncio 和asynqp同时使用多个队列。
我不明白为什么我的asyncio.sleep()
函数调用没有任何效果。代码不会停在那里。公平地说,我实际上不明白回调是在哪个上下文中执行的,以及我是否可以完全控制事件循环(这样asyncio.sleep()
调用才有意义)。
如果我必须在回调函数中使用aiohttp.ClientSession.get()
函数调用怎么办?process_msg
我不能这样做,因为它不是协程。必须有一种方法超出我目前对 asyncio 的理解。
#!/usr/bin/env python3
import asyncio
import asynqp
USERS = {'betty', 'bob', 'luis', 'tony'}
def process_msg(msg):
asyncio.sleep(10)
print('>> {}'.format(msg.body))
msg.ack()
async def connect():
connection = await asynqp.connect(host='dev_queue', virtual_host='asynqp_test')
channel = await connection.open_channel()
exchange = await channel.declare_exchange('inboxes', 'direct')
# we have 10 users. Set up a queue for each of them
# use different channels to avoid any interference
# during message consumption, just in case.
for username in USERS:
user_channel = await connection.open_channel()
queue = await user_channel.declare_queue('Inbox_{}'.format(username))
await queue.bind(exchange, routing_key=username)
await queue.consume(process_msg)
# deliver 10 messages to each user
for username in USERS:
for msg_idx in range(10):
msg = asynqp.Message('Msg #{} for {}'.format(msg_idx, username))
exchange.publish(msg, routing_key=username)
loop = asyncio.get_event_loop()
loop.run_until_complete(connect())
loop.run_forever()