0

我正在尝试将生产者连接到消费者的一些变体,在特殊情况下,有时我需要为每条消息生成 1 条额外消息(例如,1 条到输出主题,1 条消息到不同的主题),同时保持保证那。

我正在考虑做 mapConcat 并输出多个 ProducerRecord 对象,我担心边缘情况下的松散保证,即第一条消息足以在该偏移量上发生提交,从而导致第二条消息的潜在丢失。此外,您似乎不能只执行 .flatmap ,因为您将进入图形 API,这会变得更加混乱,因为一旦您合并到提交流程中,您不会忽略重复的偏移量变得更加困难.

Consumer.committableSource(consumerSettings, Subscriptions.topics(inputTopic))
  .map(msg => (msg, addLineage(msg.record.value())))
  .mapConcat(input => 
    if (math.random > 0.25) 
      List(ProducerMessage.Message(
        new ProducerRecord[Array[Byte], Array[Byte]](outputTopic, input._1.record.key(), input._2),
        input._1.committableOffset
      ))
    else List(ProducerMessage.Message(
      new ProducerRecord[Array[Byte], Array[Byte]](outputTopic, input._1.record.key(), input._2),
      input._1.committableOffset
    ),ProducerMessage.Message(
      new ProducerRecord[Array[Byte], Array[Byte]](outputTopic2, input._1.record.key(), input._2),
      input._1.committableOffset
    ))
  )
  .via(Producer.flow(producerSettings))
  .map(_.message.passThrough)
  .batch(max = 20, first => CommittableOffsetBatch.empty.updated(first)) {
    (batch, elem) => batch.updated(elem)
  }
  .mapAsync(parallelism = 3)(_.commitScaladsl())
  .runWith(Sink.ignore)

原始的 1 对 1 文档在这里:https ://doc.akka.io/docs/akka-stream-kafka/current/consumer.html#connecting-producer-and-consumer

有没有人想到/解决了这个问题?

4

1 回答 1

2

Alpakka Kafka 连接器最近引入了flexiFlow支持您的用例:让一个流元素向 Kafka 生成多条消息

于 2018-10-08T08:28:10.317 回答