我们正在尝试使用 Akka Streams 和 Alpakka Kafka 来消费服务中的事件流。为了处理事件处理错误,我们使用 Kafka 自动提交和多个队列。例如,如果我们有user_created
想要从产品服务中消费的主题,我们也会创建user_created_for_products_failed
和user_created_for_products_dead_letter
。这两个额外的主题与特定的 Kafka 消费者组耦合。如果一个事件处理失败,它会进入失败队列,我们会在五分钟内再次尝试消费——如果再次失败,它就会进入死信。
在部署时,我们希望确保我们不会丢失事件。所以我们试图在停止应用程序之前停止流。正如我所说,我们正在使用自动提交,但所有这些“飞行”的事件都尚未处理。一旦流和应用程序停止,我们就可以部署新代码并再次启动应用程序。
阅读文档后,我们已经看到了该KillSwitch
功能。我们在其中看到的问题是该shutdown
方法按我们的预期Unit
返回。Future[Unit]
我们不确定使用它是否会丢失事件,因为在测试中它看起来太快而无法正常工作。
ActorSystem
作为一种解决方法,我们为每个流创建一个并使用该terminate
方法(返回 a Future[Terminate]
)。这个解决方案的问题是,我们认为创建ActorSystem
每个流不会很好地扩展,并且terminate
需要很长时间才能解决(在测试中,关闭最多需要一分钟)。
你遇到过这样的问题吗?是否有更快的方法(与 相比ActorSystem.terminate
)来停止流并确保Source
已处理所有已发出的事件?