嗨,我正在尝试使用 Alpakka 文档中所示的 Producer api。我可以使用事务源来使用记录,并且创建了生产者,但无法将消息发送到主题无法使用 Alpakka 中的 Transactional.Sink 生产到主题,但我看到启用了幂等生产者。我看到它正在进入逻辑的日志但它没有向 myTopic 产生事件
[info] OakcpKafkaProducer - [Producer clientId=producer-7fe8789c-3171-429e-afbf-d8a8ba12700c, transactionalId=7fe8789c-3171-429e-afbf-d8a8ba12700c] 幂等生产者已启用。
你能帮我理解为什么它可能不会产生主题信息吗
我正在使用 docker 在本地运行我的代码
下面是我的代码
``` Transactional.source(consumerSettings,
Subscriptions.topics(topicNames))
.mapMaterializedValue(innerControl = _)
.map(consumerRecord => {
handleBusiness(consumerRecord)
.flatMap(res => Source.single(res)
.runWith(Transactional.sink(producerSettings,
UUID.randomUUID().toString)))
})
}
source.runWith(Sink.ignore)
And my handleBusiness logics looks like below:
```
private def handleBusiness(consumedMessage: ConsumerMessage.TransactionalMessage[String, String]): Future[Envelope[String, String, PartitionOffset]] = {
(conversion of consumedMessage ) map { message =>
ProducerMessage.single(new ProducerRecord("myTopic", consumedMessage.record.key, message ), consumedMessage.partitionOffset)
}
}```