1

我正在使用库学习 ZIO 与 Apache Kafka 的集成zio-kafka。在Github 主项目页面的示例中,他们使用一个mapM函数来提交一个块的偏移量:

Consumer.subscribeAnd(Subscription.topics("topic150"))
  .plainStream(Serde.string, Serde.string)
  .tap(cr => putStrLn(s"key: ${cr.record.key}, value: ${cr.record.value}"))
  .map(_.offset)
  .aggregateAsync(Consumer.offsetBatches)
  .mapM(_.commit)
  .runDrain

但是,恕我直言,提交偏移量是流上的终端操作。使用 a 有什么区别ZSink

Consumer.subscribeAnd(Subscription.topics("topic150"))
  .plainStream(Serde.string, Serde.string)
  .tap(cr => putStrLn(s"key: ${cr.record.key}, value: ${cr.record.value}"))
  .map(_.offset)
  .aggregateAsync(Consumer.offsetBatches)
  .run(ZSink.foreach(_.commit))
4

0 回答 0