1

我正在阅读 Alpakka 中的 Consumer API for Kafka 文档。我遇到了这段代码。据我了解,偏移量是使用 msg.committableOffset() 提交的。那为什么我们需要 .toMat() 和 mapMaterializedValue()。我不能将它附加到 Sink.Ignore() 吗?

Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
      .mapAsync(
          1,
          msg ->
              business(msg.record().key(), msg.record().value())
                  .thenApply(done -> msg.committableOffset()))
      .toMat(Committer.sink(committerSettings.withMaxBatch(1)), Keep.both())
      .mapMaterializedValue(Consumer::createDrainingControl)
      .run(materializer);
4

1 回答 1

1

您不能附加到 Sink.ignore,因为您已经附加了 Commiter.Sink。但是您可以丢弃物化值。

该示例使用 toMat 和 Keep.both 来保留物化值,即来自 Source 的 Control 和来自 Sink 的 Future[Done]。使用这两个值,它会在 mapMaterializedValue 中创建一个 DrainingControl,允许您在停止之前停止流或排出流,或者在流停止时收到通知。

如果您不关心此控件(尽管您应该),您可以使用

Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
      .mapAsync(
          1,
          msg ->
              business(msg.record().key(), msg.record().value())
                  .thenApply(done -> msg.committableOffset()))
      .to(Committer.sink(committerSettings.withMaxBatch(1)))
      .run(materializer);
于 2019-04-20T13:10:19.873 回答