除了 Matthias J. Sax 所说的之外,还有一个流到流(窗口)连接示例:
https ://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/test /java/io/confluent/examples/streams/StreamToStreamJoinIntegrationTest.java
这适用于带有 Apache Kafka 0.10.1 的 Confluent 3.1.x,即截至 2017 年 1 月的最新版本。有关master
使用较新版本的代码示例,请参见上面存储库中的分支。
这是上面代码示例的关键部分(同样,对于 Kafka 0.10.1),稍微适应了您的问题。请注意,此示例恰好演示了 OUTER JOIN。
long joinWindowSizeMs = TimeUnit.MINUTES.toMillis(5);
long windowRetentionTimeMs = TimeUnit.DAYS.toMillis(30);
final Serde<String> stringSerde = Serdes.String();
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> alerts = builder.stream(stringSerde, stringSerde, "adImpressionsTopic");
KStream<String, String> incidents = builder.stream(stringSerde, stringSerde, "adClicksTopic");
KStream<String, String> impressionsAndClicks = alerts.outerJoin(incidents,
(impressionValue, clickValue) -> impressionValue + "/" + clickValue,
// KStream-KStream joins are always windowed joins, hence we must provide a join window.
JoinWindows.of(joinWindowSizeMs).until(windowRetentionTimeMs),
stringSerde, stringSerde, stringSerde);
// Write the results to the output topic.
impressionsAndClicks.to(stringSerde, stringSerde, "outputTopic");