您如何使用 Akka Streams 持续使用来自 Apache Pulsar 的消息并打印每条消息?
下面是我从pulsar4s库中找到的示例代码。您如何打印消费的消息,而不是将消息发布到另一个主题?
val consumerFn = () => client.consumer(ConsumerConfig(Seq(intopic), Subscription("mysub")))
val producerFn = () => client.producer(ProducerConfig(outtopic))
val control = source(consumerFn, Some(MessageId.earliest))
.map { consumerMessage => ProducerMessage(consumerMessage.data) }
.to(sink(producerFn)).run()