1

我正在使用 Scala 2.11 和 Akka Streams Kafka 0.17。

我有一个,其中:

  • ASource是使用Source.actorRef. 在这里,actor 被安排在某个固定的时间间隔运行并连续生成消息,这些消息被发送到流中。
  • 我已将 aProducer作为Flow. 生产者推ProducerMessage.Message送到 Kafka 主题。
  • 一些数据库操作。

我在构建 时遇到问题ProducerMessage.Message,如下所示:

final case class Message[K, V, +PassThrough](
    record: ProducerRecord[K, V],
    passThrough: PassThrough
  )

我可以轻松地传递record包含实际消息的参数。但我不知道要在passThrough参数中传递什么。根据文档

passThrough字段可以保存通过Consumer#flow并包含在 中的任何元素Result。当需要在下游操作上传递一些上下文时,这很有用。这可以通过解压缩/压缩来完成,但这更方便。例如,它可以是一个ConsumerMessage.CommittableOffsetConsumerMessage.CommittableOffsetBatch可以稍后在流程中提交。

就我而言,没有任何 Kafka 消费者订阅 Kafka 主题并为我的流生成Source(comittableSourceplainSource)。在这种情况下,我会按照文档中的说明传递消费者偏移量。但就我而言,演员正在模拟这样的消费者。这意味着我无权访问ConsumerMessage.CommittableOffset. 那么我在passThrough这里为参数传递什么?在这种情况下,最佳做法是什么?

4

1 回答 1

0

在向团队转发我的问题后reactive-kafka,我得到了答案。基本上,他们所说的是,如果您没有任何用例pass through,您可以尝试将其设置为NoneNotUsed,或者只是空字符串""

另请注意,如果您使用 a Producer.plainSink,则不需要构造 a ProducerMessage.Message。然后,您可以直接构造一个 Kafka ProducerRecord。该ProducerMessage.Message案例类只是pass through需要或需要的案例的容器。除了要传递的元素之外,它只包含一个 Kafka ProducerRecord

于 2017-11-12T15:15:19.600 回答