2
final KStream<String, EmpModel> empModelStream = getMapOperator(empoutStream);
final KStream<String, EmpModel> empModelinput = getMapOperator(inputStream);

//  empModelinput.print();
//  empModelStream.print();

empModelStream.join(empModelinput, new ValueJoiner<EmpModel, EmpModel, Object>() {

    @Override
    public Object apply(EmpModel paramV1, EmpModel paramV2) {
        System.out.println("Model1 "+paramV1.getKey());
        System.out.println("Model2 "+paramV2.getKey());
        return paramV1;
    }

},JoinWindows.of("2000L"));

我得到错误:

无效的拓扑构建:KSTREAM-MAP-0000000003 和 KSTREAM-MAP-0000000004 不可连接

4

1 回答 1

3

如果要加入两个KStreams,则必须确保两者具有相同数量的分区。(参见http://docs.confluent.io/current/streams/developer-guide.html#joining-streams中的“注”框)

如果您使用 Kafka v0.10.1+,将自动进行重新分区(参见http://docs.confluent.io/current/streams/upgrade-guide.html#auto-repartitioning)。

对于 Kafka v0.10.0.x,您有两种选择:

  1. 确保原始输入主题确实具有相同数量的分区
  2. .through("my-repartitioning-topic")或者,在加入之前添加对其中一个KStreams的调用。在启动 Streams 应用程序之前,您需要创建"my-repartioning-topic"具有正确分区数的主题(即,与第二个KStream原始输入主题的分区数相同)
于 2017-02-27T17:57:44.880 回答