9

基于apache Kafka docs KStream-to-KStream Joins are always windowed joins,我的问题是如何控制窗口的大小?保持主题数据的大小是否相同?或者例如,我们可以保留 1 个月的数据,但只加入过去一周的数据流?

有没有什么好的例子来显示一个窗口化的 KStream-to-kStream 窗口化连接?

就我而言,假设我有 2 个 KStream,kstream1并且kstream2我希望能够加入 10 天kstream1到 30 天的kstream2.

4

2 回答 2

14

这是绝对可能的。当您定义 Stream 运算符时,您明确指定连接窗口大小。

KStream stream1 = ...;
KStream stream2 = ...;
long joinWindowSizeMs = 5L * 60L * 1000L; // 5 minutes
long windowRetentionTimeMs = 30L * 24L * 60L * 60L * 1000L; // 30 days

stream1.leftJoin(stream2,
                 ... // add ValueJoiner
                 JoinWindows.of(joinWindowSizeMs)
);

// or if you want to use retention time

stream1.leftJoin(stream2,
                 ... // add ValueJoiner
                 (JoinWindows)JoinWindows.of(joinWindowSizeMs)
                                         .until(windowRetentionTimeMs)
);

有关更多详细信息,请参阅http://docs.confluent.io/current/streams/developer-guide.html#joining-streams

滑动窗口基本上定义了一个附加的连接谓词。在类似 SQL 的语法中,这将类似于:

SELECT * FROM stream1, stream2
WHERE
   stream1.key = stream2.key
   AND
   stream1.ts - before <= stream2.ts
   AND
   stream2.ts <= stream1.ts + after

before == after == joinWindowSizeMs这个例子中。如果您使用and明确设置这些值before,也可以有不同的值。afterJoinWindows#before()JoinWindows#after()

源主题的保留时间完全独立于windowRetentionTimeMs应用于 Kafka Streams 本身创建的变更日志主题的指定。窗口保留允许将无序记录彼此连接,即延迟到达的记录(请记住,Kafka 有一个基于偏移量的排序保证,但关于时间戳,记录可能是无序的) .

于 2017-01-17T22:41:18.443 回答
5

除了 Matthias J. Sax 所说的之外,还有一个流到流(窗口)连接示例: https ://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/test /java/io/confluent/examples/streams/StreamToStreamJoinIntegrationTest.java

这适用于带有 Apache Kafka 0.10.1 的 Confluent 3.1.x,即截至 2017 年 1 月的最新版本。有关master使用较新版本的代码示例,请参见上面存储库中的分支。

这是上面代码示例的关键部分(同样,对于 Kafka 0.10.1),稍微适应了您的问题。请注意,此示例恰好演示了 OUTER JOIN。

long joinWindowSizeMs = TimeUnit.MINUTES.toMillis(5);
long windowRetentionTimeMs = TimeUnit.DAYS.toMillis(30);

final Serde<String> stringSerde = Serdes.String();
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> alerts = builder.stream(stringSerde, stringSerde, "adImpressionsTopic");
KStream<String, String> incidents = builder.stream(stringSerde, stringSerde, "adClicksTopic");

KStream<String, String> impressionsAndClicks = alerts.outerJoin(incidents,
    (impressionValue, clickValue) -> impressionValue + "/" + clickValue,
    // KStream-KStream joins are always windowed joins, hence we must provide a join window.
    JoinWindows.of(joinWindowSizeMs).until(windowRetentionTimeMs),
    stringSerde, stringSerde, stringSerde);

// Write the results to the output topic.
impressionsAndClicks.to(stringSerde, stringSerde, "outputTopic");
于 2017-01-24T10:11:24.820 回答