0

嗨,我正在尝试使用 Alpakka 文档中所示的 Producer api。我可以使用事务源来使用记录,并且创建了生产者,但无法将消息发送到主题无法使用 Alpakka 中的 Transactional.Sink 生产到主题,但我看到启用了幂等生产者。我看到它正在进入逻辑的日志但它没有向 myTopic 产生事件

[info] OakcpKafkaProducer - [Producer clientId=producer-7fe8789c-3171-429e-afbf-d8a8ba12700c, transactionalId=7fe8789c-3171-429e-afbf-d8a8ba12700c] 幂等生产者已启用。

你能帮我理解为什么它可能不会产生主题信息吗

我正在使用 docker 在本地运行我的代码

下面是我的代码

``` Transactional.source(consumerSettings,
            Subscriptions.topics(topicNames))
            .mapMaterializedValue(innerControl = _)
            .map(consumerRecord => {

              handleBusiness(consumerRecord)
                .flatMap(res => Source.single(res)
                       .runWith(Transactional.sink(producerSettings, 
                                           UUID.randomUUID().toString)))

            })

        }
          source.runWith(Sink.ignore)

And my handleBusiness logics looks like below:

   ```
 private def handleBusiness(consumedMessage: ConsumerMessage.TransactionalMessage[String, String]): Future[Envelope[String, String, PartitionOffset]]  = {

       (conversion of consumedMessage ) map { message =>

            ProducerMessage.single(new ProducerRecord("myTopic", consumedMessage.record.key, message ), consumedMessage.partitionOffset)

     }


 }```


4

1 回答 1

0

我也可以使用一个流来做到这一点事务源需要有一个接收器/流,如下所示

Transactional.source(consumerSettings,
                      Subscriptions.topics(topicNames))
                      .mapMaterializedValue(innerControl = _)
                      .mapAsync(5) { msg => business(msg)}
                      .via(Transactional.flow(producersettings, transactions-id))
于 2020-05-20T18:47:28.983 回答