我们使用 akka 流和 alpakka kafka 来使用来自不同主题的消息,使用主题模式订阅。使用 groupBy() 为每个主题创建子源/流。
我们如何暂停/恢复/控制单个主题的消息处理,而不影响其他主题的消息处理。
以下选项不起作用。
使用 RestartSource 将重新启动源并影响所有主题的处理。
使用 RestartFlow 会丢弃失败的元素,也会影响所有主题的处理。
如果我们必须为所有主题重新启动源并且有额外的开销,那么为每个主题创建一个源是资源密集型的。
Consumer
.committableSource(consumerSettings.withGroupId("test-group"),
Subscriptions.topicPattern("/some/pattern"))
.groupBy(5000, msg -> msg.record().topic())
.mapAsync(1, msg -> business(msg).thenApply(response -> {
if (response.status().isFailure()) {
throw new RuntimeException("Boom");
}
return msg.committableOffset();}))
.mapAsync(1, offset -> offset.commitJavadsl());