我正在使用 Scala 2.11 和 Akka Streams Kafka 0.17。
我有一个流,其中:
- A
Source
是使用Source.actorRef
. 在这里,actor 被安排在某个固定的时间间隔运行并连续生成消息,这些消息被发送到流中。 - 我已将 a
Producer
作为Flow
. 生产者推ProducerMessage.Message
送到 Kafka 主题。 - 一些数据库操作。
我在构建 时遇到问题ProducerMessage.Message
,如下所示:
final case class Message[K, V, +PassThrough](
record: ProducerRecord[K, V],
passThrough: PassThrough
)
我可以轻松地传递record
包含实际消息的参数。但我不知道要在passThrough
参数中传递什么。根据文档:
该
passThrough
字段可以保存通过Consumer#flow
并包含在 中的任何元素Result
。当需要在下游操作上传递一些上下文时,这很有用。这可以通过解压缩/压缩来完成,但这更方便。例如,它可以是一个ConsumerMessage.CommittableOffset
或ConsumerMessage.CommittableOffsetBatch
可以稍后在流程中提交。
就我而言,没有任何 Kafka 消费者订阅 Kafka 主题并为我的流生成Source
(comittableSource
或plainSource
)。在这种情况下,我会按照文档中的说明传递消费者偏移量。但就我而言,演员正在模拟这样的消费者。这意味着我无权访问ConsumerMessage.CommittableOffset
. 那么我在passThrough
这里为参数传递什么?在这种情况下,最佳做法是什么?