我想让两个浮士德代理监听同一个 kafka 主题,但是每个代理在处理事件之前都使用自己的过滤器,并且它们的事件集不会相交。
在文档中,我们有一个示例: https ://faust.readthedocs.io/en/latest/userguide/streams.html#id4
如果两个代理使用订阅同一主题的流:
topic = app.topic('orders') @app.agent(topic) async def processA(stream): async for value in stream: print(f'A: {value}') @app.agent(topic) async def processB(stream): async for value in stream: print(f'B: {value}')
Conductor 会将收到的关于“orders”主题的每条消息转发给两个代理,每当它进入代理流时都会增加引用计数。
当事件被确认时,引用计数减少,当它达到零时,消费者将认为该偏移量“完成”并可以提交它。
下面是过滤器https://faust.readthedocs.io/en/latest/userguide/streams.html#id13:
@app.agent() async def process(stream): async for value in stream.filter(lambda: v > 1000).group_by(...): ...
我使用了一些复杂的过滤器,但结果将流分成两部分,用于具有完全不同逻辑的两个代理。(我不使用 group_by)
如果两个代理一起工作,一切正常。但是,如果我停止它们并重新启动它们,它们将从头开始处理流。因为每一个事件都没有得到代理人之一的承认。如果我确认每个代理中的所有事件,而如果其中一个代理不会启动,那么第二个代理将清除该主题。(如果一个被粉碎并重新启动,指挥将看到三个订阅者,因为它正在等待粉碎的代理响应 20 分钟)。
我只想将事件分为两部分。在这种情况下如何进行适当的同步?