我是 faust 和 asyncio 的新手,我试图理解为什么这段代码最终会同步打印它的输出。
import faust
import asyncio
app = faust.App(
'service',
broker='kafka://localhost:9092',
value_serializer='raw',
)
greetings_topic = app.topic('greetings')
@app.agent(greetings_topic)
async def greet(greetings):
async for greeting in greetings:
print("Start")
await asyncio.sleep(20)
print(greeting)
例如,我在问候主题中预先创建了 3 条消息,使用:
faust -A generic_service send greetings "hello: $(date +"%T")"
我假设我会先得到所有“开始”,然后 20 秒后得到所有“你好:......”,但我有这个:
faust -A service worker -l info
...
[2022-02-06 14:35:56,290] [31853] [WARNING] start
[2022-02-06 14:36:16,291] [31853] [WARNING] b'hello: 14:35:30'
[2022-02-06 14:36:16,293] [31853] [WARNING] start
[2022-02-06 14:36:36,294] [31853] [WARNING] b'hello: 14:35:40'
[2022-02-06 14:36:36,295] [31853] [WARNING] start
[2022-02-06 14:36:56,301] [31853] [WARNING] b'hello: 14:35:48'
有人可以帮助理解我做错了什么或者我的假设是否正确吗?谢谢 !