1

对于下面的代码,stream1 和 stream2 都单独运行良好,我可以看到输出,但连接的流根本不记录任何内容。我感觉它与连接窗口有关,但是来自两个流的数据几乎同时进入。

val stream = builder.stream(stringSerde, byteArraySerde, "topic")

val stream1 = stream
  .filter((key, value) => somefilter(key, value))
  .through(stringSerde, byteArraySerde, "topic1")

val stream2 = stream
  .filter((key, value) => someotherfilter(key, value))
  .through(stringSerde, byteArraySerde, "topic2")

val joinedStream = stream1
  .join(stream2, (value1: Array[Byte], value2: Array[Byte]) => {
    println("wont print anything")
    return somerandomdata
  },
  JoinWindows.of("othertopic").within(10000L),
  stringSerde, byteArraySerde, byteArraySerde)
4

1 回答 1

1

为了加入它们,两个主题的键不应该相同吗?

我认为 Javadoc 解释了这一点: https ://kafka.apache.org/0102/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html

这也可能是一个有趣的阅读: https ://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics

于 2017-06-05T14:05:08.503 回答