重置与 kafka 代理的连接时,我看到了这个问题。在这种情况下,faust 应用程序继续处理消息,但无法提交偏移量,并出现以下错误:
Could not send <class 'aiokafka.protocol.transaction.AddOffsetsToTxnRequest_v0'>: StaleMetadata('Broker id 8 not in current metadata')
这种情况一直持续到 faust 应用程序重新启动。
一旦应用程序重新启动,它会重新处理消息并开始提交偏移量
结果,处理了许多重复的消息。