We want to do Kstream-Kstream join based on the common Field(primary key). Currently with the below code we are getting result as just merging 2 Streams without any primary key constraint.
val userRegions: KStream[String, String] = builder.stream(inputTopic1)
val regionMetrics: KStream[String, String] = builder.stream(inputTopic2)
userRegions.join(regionMetrics)(
((regionValue, metricValue) => regionValue + "/" + metricValue),
JoinWindows.of(Duration.ofMinutes(5L))
).to(outputTopicName)
Could you please suggest how to join 2 Streams based on common field/Column.