12

我有 2 个 Kafka 主题流式传输来自不同来源的完全相同的内容,因此如果其中一个来源出现故障,我可以获得高可用性。我正在尝试使用 Kafka Streams 0.10.1.0 将 2 个主题合并为 1 个输出主题,这样我就不会错过任何有关失败的消息,并且在所有源都启动时没有重复。

使用leftJoinKStream 的方法时,其中一个主题可以毫无问题地关闭(次要主题),但是当主要主题关闭时,没有任何内容发送到输出主题。这似乎是因为,根据Kafka Streams 开发者指南

KStream-KStream leftJoin 总是由来自主流的记录驱动

因此,如果没有来自主流的记录,它不会使用来自辅助流的记录,即使它们存在。一旦主流重新联机,输出就会正常恢复。

我也尝试过使用outerJoin(添加重复记录),然后转换为 KTable 和 groupByKey 以消除重复项,

KStream mergedStream = stream1.outerJoin(stream2, 
    (streamVal1, streamVal2) -> (streamVal1 == null) ? streamVal2 : streamVal1,
    JoinWindows.of(2000L))

mergedStream.groupByKey()
            .reduce((value1, value2) -> value1, TimeWindows.of(2000L), stateStore))
            .toStream((key,value) -> value)
            .to(outputStream)

但我仍然偶尔会得到重复。我也commit.interval.ms=200经常使用让 KTable 发送到输出流。

处理此合并以从多个相同的输入主题中获得一次输出的最佳方法是什么?

4

1 回答 1

8

使用任何类型的连接都不能解决您的问题,因为您总是会丢失结果(在某些流停止的情况下内连接)或与null(左连接或外连接,以防两个流都在线)“重复” )。有关 Kafka Streams 中连接语义的详细信息,请参阅https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics

因此,我建议使用可以使用 、 或 与 DSL 混合搭配的KStream process()处理器transform()API transformValues()。有关更多详细信息,请参阅如何使用 Kafka Stream DSL 使用处理器过滤键和值

您还可以将自定义存储添加到您的处理器(如何将自定义 StateStore 添加到 Kafka Streams DSL 处理器?)以使重复过滤容错。

于 2016-11-28T17:40:19.230 回答