我正在使用库学习 ZIO 与 Apache Kafka 的集成zio-kafka
。在Github 主项目页面的示例中,他们使用一个mapM
函数来提交一个块的偏移量:
Consumer.subscribeAnd(Subscription.topics("topic150"))
.plainStream(Serde.string, Serde.string)
.tap(cr => putStrLn(s"key: ${cr.record.key}, value: ${cr.record.value}"))
.map(_.offset)
.aggregateAsync(Consumer.offsetBatches)
.mapM(_.commit)
.runDrain
但是,恕我直言,提交偏移量是流上的终端操作。使用 a 有什么区别ZSink
?
Consumer.subscribeAnd(Subscription.topics("topic150"))
.plainStream(Serde.string, Serde.string)
.tap(cr => putStrLn(s"key: ${cr.record.key}, value: ${cr.record.value}"))
.map(_.offset)
.aggregateAsync(Consumer.offsetBatches)
.run(ZSink.foreach(_.commit))