0

我正在尝试将asyncio功能集成到我的 kafka 主题侦听器中,并且遇到了问题(python 中的异步编程非常新)。

我有一个confluent-kafka consumer正在听一个主题的创建。该主题经常有消息,性能是最重要的(因此引入了异步 io)。

main() 函数如下所示:

def main(self):
    while True:
        try:
            msg = consumer.poll(timeout=5.0)
            if msg is None:
                continue

            asyncio.ensure_future(handle_message(message))
        finally:
            consumer.close()     

本质上,我想以线性方式从主题中提取消息,但是消息的处理应该是异步的……这意味着发生的任何数据库 I/Ohandle_message等都将被异步处理(我设置了等待等)在该功能中正确)。问题是我似乎从未在 asyncio.ensure_future() 中开始执行。 当我从 kafka 主题中提取消息时,如何不断地将任务添加到异步循环中? 使用confluent-kafka==1.5.0

4

0 回答 0