我需要构建以下图表:
val graph = getFromTopic1 ~> doSomeWork ~> writeToTopic2 ~> commitOffsetForTopic1
但是试图在 Reactive Kafka 中实现它让我陷入了困境。这似乎是错误的,因为这让我觉得这是一个相对常见的用例:我想在 Kafka 主题之间移动数据,同时保证至少一次交付语义。
现在并行写完全没问题
val fanOut = new Broadcast(2)
val graph = getFromTopic1 ~> doSomeWork ~> fanOut ~> writeToTopic2
fanOut ~> commitOffsetForTopic1
此代码有效,因为writeToTopic2
可以使用 实现ReactiveKafka#publish(..)
,它返回一个Sink
. 但是当我的应用程序崩溃时,我会失去 ALOS 保证和数据。
所以我真正需要的是编写一个写入 Kafka 主题的 Flow。我尝试过使用Flow.fromSinkAndSource(..)
自定义GraphStage
,但遇到了流经数据的类型问题;例如,提交的内容commitOffsetForTopic1
不应包含在 中writeToTopic2
,这意味着我必须一直保留一个包含两条数据的包装器对象。但这与writeToTopic2
接受ProducerMessage[K,V]
. 我最近解决这个问题的尝试遇到了反应式 kafka 库中的私有和最终类(扩展/包装/替换底层 SubscriptionActor)。
我真的不想维护一个分叉来实现这一点。我错过了什么?为什么这么难?我是否以某种方式试图构建一个病态图节点,或者这个用例是一个疏忽……还是我在我一直在挖掘的文档和源代码中以某种方式遗漏了一些完全明显的东西?
当前版本是 0.10.1。我可以根据要求添加更多关于我的许多尝试的详细信息。