0

We want to do Kstream-Kstream join based on the common Field(primary key). Currently with the below code we are getting result as just merging 2 Streams without any primary key constraint.

val userRegions: KStream[String, String] = builder.stream(inputTopic1)
val regionMetrics: KStream[String, String] = builder.stream(inputTopic2)


userRegions.join(regionMetrics)(
  ((regionValue, metricValue) => regionValue + "/" + metricValue),
  JoinWindows.of(Duration.ofMinutes(5L))
).to(outputTopicName)

Could you please suggest how to join 2 Streams based on common field/Column.

4

1 回答 1

0

为了加入基于公共字段/列的 2 个流,您可以使用 selectKey() 函数,我将为您提供一个可以帮助您的片段。

val userRegions: KStream[String, String] = builder.stream(inputTopic1)
val regionMetrics: KStream[String, String] = builder.stream(inputTopic2)

// New code
val userRegionsWithKeys = userRegions.selectKey(new ValueMapper (String key, String Value) {
   // create your key here and return it
   // Please the syntax for Scala
    @override
    void apply (String key, String value) {
       return "key that you want" 
    }
});

val regionMetricsWithKeys = regionMetrics.selectKey(new ValueMapper (String key, String Value) {
   // create your key here and return it
   // Please the syntax for Scala
    @override
    void apply (String key, String value) {
       return "key that you want" 
    }
});


userRegionsWithKeys .join(regionMetricsWithKeys )(
  ((regionValue, metricValue) => regionValue + "/" + metricValue),
  JoinWindows.of(Duration.ofMinutes(5L))
).to(outputTopicName)

我希望这个解决方案可以帮助你。

谢谢

于 2021-08-27T20:33:19.037 回答