我正在尝试在我的代码中实现 Faust 库以使用来自 Kafka 的数据,异步运行一些代码并将数据返回给 Kafka。我遵循了文档,但似乎在我的代码(或我对浮士德如何工作的理解)中犯了一个错误,因为我的应用程序中的浮士德当前一次使用来自 Kafka 的消息,并且只有在将数据返回给 Kafka 之后才会它开始使用另一条消息。
我的代码:
app = faust.App('app_faust', broker=[...], store='rocksdb://')
class Company(faust.Record):
company_id: str
task_id: int
topic_c = app.topic(
'topic_name',
key_type=bytes,
value_type=Company,
)
topic_p = app.topic(
'another_topic_name',
key_type=bytes,
value_type=Company,
)
@app.agent('topic_name', sink=[topic_p], value_type=Company, concurrency=4)
async def test(data: faust.Stream):
async for company in data:
if data and company.company_id and company.task_id:
yield some_function(company)
知道我在做什么错吗?我正在使用浮士德 1.10.4