1

检查下面的代码

@app.agent()
async def process(stream):
    async for value in stream.take(5000, within=5):
        process(value)

代理在 5 秒内异步获取 5000 条记录并进行处理。我不希望代理在前一个处理完成之前再选择 500 万条记录。基本上我想同步运行代理。有没有办法我们可以做到?

4

2 回答 2

1

我认为您可以在代理上将并发设置为 1,这样可以有效地使其同步。

如果您这样做,您可能还会发现修改主题分区很有用,但我对这两个设置之间的关系没有完全了解(只是想指出一个可能有用的途径)。

于 2020-06-30T18:17:40.203 回答
0

我尝试使用以下代码查看工人是否正在执行第二批记录,而第一批的处理尚未完成

@app.agent()
async def process(stream):
    async for value in stream.take(5000, within=5):
        print(1)
        await async.sleep(30)

工人打印1并等待 30 秒打印2。await 语句将控制权交还给事件循环,但在这种情况下它等待,这意味着批处理一个接一个地执行。因此这是同步的。

PS。提交偏移、重新平衡、监控等是由事件循环处理的异步操作。

于 2020-07-02T15:04:27.973 回答