0

我是 faust 和 asyncio 的新手,我试图理解为什么这段代码最终会同步打印它的输出。

import faust
import asyncio

app = faust.App(
    'service',
    broker='kafka://localhost:9092',
    value_serializer='raw',
)

greetings_topic = app.topic('greetings')
    

@app.agent(greetings_topic)
async def greet(greetings):
    async for greeting in greetings:
        print("Start")
        await asyncio.sleep(20)
        print(greeting)

例如,我在问候主题中预先创建了 3 条消息,使用:

faust -A generic_service send greetings "hello: $(date +"%T")"

我假设我会先得到所有“开始”,然后 20 秒后得到所有“你好:......”,但我有这个:

faust -A service worker -l info
...
[2022-02-06 14:35:56,290] [31853] [WARNING] start 
[2022-02-06 14:36:16,291] [31853] [WARNING] b'hello: 14:35:30' 
[2022-02-06 14:36:16,293] [31853] [WARNING] start 
[2022-02-06 14:36:36,294] [31853] [WARNING] b'hello: 14:35:40' 
[2022-02-06 14:36:36,295] [31853] [WARNING] start 
[2022-02-06 14:36:56,301] [31853] [WARNING] b'hello: 14:35:48' 

有人可以帮助理解我做错了什么或者我的假设是否正确吗?谢谢 !

4

0 回答 0