我正在阅读 Alpakka 中的 Consumer API for Kafka 文档。我遇到了这段代码。据我了解,偏移量是使用 msg.committableOffset() 提交的。那为什么我们需要 .toMat() 和 mapMaterializedValue()。我不能将它附加到 Sink.Ignore() 吗?
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
.mapAsync(
1,
msg ->
business(msg.record().key(), msg.record().value())
.thenApply(done -> msg.committableOffset()))
.toMat(Committer.sink(committerSettings.withMaxBatch(1)), Keep.both())
.mapMaterializedValue(Consumer::createDrainingControl)
.run(materializer);