1

我想让两个浮士德代理监听同一个 kafka 主题,但是每个代理在处理事件之前都使用自己的过滤器,并且它们的事件集不会相交。

在文档中,我们有一个示例: https ://faust.readthedocs.io/en/latest/userguide/streams.html#id4

如果两个代理使用订阅同一主题的流:

 topic = app.topic('orders')

 @app.agent(topic)
 async def processA(stream):
      async for value in stream:
          print(f'A: {value}')

 @app.agent(topic)
  async def processB(stream):
       async for value in stream:
           print(f'B: {value}')

Conductor 会将收到的关于“orders”主题的每条消息转发给两个代理,每当它进入代理流时都会增加引用计数。

当事件被确认时,引用计数减少,当它达到零时,消费者将认为该偏移量“完成”并可以提交它。

下面是过滤器https://faust.readthedocs.io/en/latest/userguide/streams.html#id13

@app.agent() async def process(stream):
    async for value in stream.filter(lambda: v > 1000).group_by(...):
        ...

我使用了一些复杂的过滤器,但结果将流分成两部分,用于具有完全不同逻辑的两个代理。(我不使用 group_by)

如果两个代理一起工作,一切正常。但是,如果我停止它们并重新启动它们,它们将从头开始处理流。因为每一个事件都没有得到代理人之一的承认。如果我确认每个代理中的所有事件,而如果其中一个代理不会启动,那么第二个代理将清除该主题。(如果一个被粉碎并重新启动,指挥将看到三个订阅者,因为它正在等待粉碎的代理响应 20 分钟)。

我只想将事件分为两部分。在这种情况下如何进行适当的同步?

4

1 回答 1

0

faust确认过滤掉的事件时,过滤有一些错误。我建议在从流中消费时不要使用该fault.filter()功能,而是使用简单的if...then...else语句样式,类似于以下内容:

@app.agent(topic)
async def process(stream):
    async for event in stream:
        if event.amount >= 300.0:
            yield event
于 2021-07-12T14:16:36.363 回答