0

我正在尝试在我的代码中实现 Faust 库以使用来自 Kafka 的数据,异步运行一些代码并将数据返回给 Kafka。我遵循了文档,但似乎在我的代码(或我对浮士德如何工作的理解)中犯了一个错误,因为我的应用程序中的浮士德当前一次使用来自 Kafka 的消息,并且只有在将数据返回给 Kafka 之后才会它开始使用另一条消息。

我的代码:

app = faust.App('app_faust', broker=[...], store='rocksdb://')



class Company(faust.Record):
    company_id: str
    task_id: int


topic_c = app.topic(
    'topic_name',
    key_type=bytes,
    value_type=Company,
)
topic_p = app.topic(
    'another_topic_name',
    key_type=bytes,
    value_type=Company,
)

@app.agent('topic_name', sink=[topic_p], value_type=Company, concurrency=4)
async def test(data: faust.Stream):
    async for company in data:
        if data and company.company_id and company.task_id:
            yield some_function(company)

知道我在做什么错吗?我正在使用浮士德 1.10.4

4

1 回答 1

0

根据文档:

The agent modifying the table cannot process the source topic out of order, so only agents with concurrency=1 are allowed to update tables.

当您的应用程序没有 RocksDB 时,它可以工作。

https://faust.readthedocs.io/en/latest/userguide/application.html?highlight=concurrency#app-table-define-a-new-table

于 2022-01-26T04:14:07.283 回答