我有一个 Kafka 主题,我们称之为摄取,它每秒接收一个条目x
。我有一个要在此数据上运行的进程,但它必须一次运行 100 个事件。因此,我想将这些条目批处理在一起并将它们发送到一个名为batched-ingest的新主题。这两个主题将如下所示...
ingest = [entry, entry, entry, ...]
batched-ingest = [[entry_0, entry_1, ..., entry_99]]
使用浮士德的正确方法是什么?我现在的解决方案是这个......
app = faust.App("explore", value_serializer="raw")
ingest = app.topic('ingest')
ingest_batch = app.topic('ingest-batch')
@app.agent(ingest, sink=[ingest_batch])
async def test(stream):
async for values in stream.take(10, within=1000):
yield values
我不确定这是否是《浮士德》中这样做的正确方法。如果是这样,我应该设置什么within
以使其始终等到len(values) = 100
?