我正在尝试将asyncio
功能集成到我的 kafka 主题侦听器中,并且遇到了问题(python 中的异步编程非常新)。
我有一个confluent-kafka consumer
正在听一个主题的创建。该主题经常有消息,性能是最重要的(因此引入了异步 io)。
main() 函数如下所示:
def main(self):
while True:
try:
msg = consumer.poll(timeout=5.0)
if msg is None:
continue
asyncio.ensure_future(handle_message(message))
finally:
consumer.close()
本质上,我想以线性方式从主题中提取消息,但是消息的处理应该是异步的……这意味着发生的任何数据库 I/Ohandle_message
等都将被异步处理(我设置了等待等)在该功能中正确)。问题是我似乎从未在 asyncio.ensure_future() 中开始执行。 当我从 kafka 主题中提取消息时,如何不断地将任务添加到异步循环中? 使用confluent-kafka==1.5.0