我有一个 Kafka 流应用程序,我在其中读取一个主题,进行聚合并在 KTable 中实现。然后我创建一个 Stream 并在流上运行一些逻辑。现在在流处理中,我想使用前面提到的 KTable 中的一些数据。启动流应用程序后,如何再次访问 KTable 流?我不想将 KTable 推送到新主题。
KStream<String, MyClass> source = builder.stream("my-topic");
KTable<Windowed<String>, Long> kTable =
source.groupBy((key, value) -> value.getKey(),
Grouped.<String, MyClass >as("repartition-1")
.withKeySerde(new Serdes.String())
.withValueSerde(new MyClassSerDes()))
.windowedBy(TimeWindows.of(Duration.ofSeconds(5)))
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("test-store")
.withKeySerde(new Serdes.String())
.withValueSerde(Serdes.Long()));
在这里,我想使用 kTable 中的数据。
inputstream.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
.count(Materialized.<myKey, Long, WindowStore<Bytes, byte[]>>as("str")
.withRetention(Duration.ofMinutes(30)))
.toStream()
.filter((k, v) -> {
// Here get the count for the previous Window.
// Use that count for some computation here.
}