0

我有来自 2 个主题的 2 个流。假设流 1 和流 2。

我正在做

KStream<String,String> Outstream1 = stream1.selectKey((key,value)-> {
......//some extraction from JsonObject of value part
return event | '|' | timestamp //these fields are extracted from values.
})
.groupByKey()
.reduce(some reduce logic here)
.toStream();

KStream<String,String> Outstream2 = stream2.selectKey((key,value)-> {
......//some extraction from JsonObject of value part
return event | '|' | timestamp //these fields are extracted from values.
})
.groupByKey()
.reduce(some reduce logic here)
.toStream();

//Then here is the join

KStream<String,String> JoinStream = Outstream1.join(outstream2,(dataOfStream1,dataOfStream2) -> {
//Some logic to create the data from the joins
},JoinWindows.of(Duration.ofMinutes(10)));

这是高级别的代码。没有为此逻辑生成输出。我在这里做错了什么?有人可以请指导。我无法找到我们在这里绞尽脑汁的内容。主题也是共同分区的,就像两个主题都有 4 个分区一样。

4

0 回答 0