我有一个 KStream<String,Event> 应该是 windowedBy 并且聚合结果导致内存不足:
java.lang.OutOfMemoryError: Java heap space
KStream DSL如下:
TimeWindows timeWindows = TimeWindows.of(Duration.ofDays(1)).advanceBy(Duration.ofMillis(1));
Initializer<History> historyInitializer = History::new;
Aggregator<String, Event, History> historyAggregator = (key, value, aggregate) -> {
aggregate.key = value.uuid;
aggregate.addHistoryEventWindow(value);
return aggregate;
};
KTable<String, History> historyWindowed = eventStreamRaw
.filter((key, value) -> value != null)
.groupByKey(Grouped.with(Serdes.String(), this.eventSerde))
// segment our messages into 1-day windows
.windowedBy(timeWindows)
.aggregate(historyInitializer, historyAggregator, Named.as("name"), Materialized.with(Serdes.String(), this.historySerde))
.suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded()))
.groupBy(
(key, value) -> new KeyValue<String, History>(
value.key + "|+|" + key.window().start() + "|+|" + key.window().end(), value),
Grouped.with(Serdes.String(), this.historySerde))
.aggregate(History::new, (key, value, aggValue) -> value, (key, value, aggValue) -> value,
Materialized.with(Serdes.String(), this.historySerde));
阅读一些文章(例如Kafka Streams Window By & RocksDB Tuning),我注意到我可能必须将商店“Materialized”配置为保留“1 天 + 1 Milli”。
但是尝试添加对我不起作用:
final Materialized<String, History, WindowStore<Bytes, byte[]>> store = Materialized.<String, History, WindowStore<Bytes, byte[]>>as("eventstore")
.withKeySerde(Serdes.String())
.withValueSerde(this.historySerde)
.withRetention(Duration.ofDays(1).plus(Duration.ofMillis(1)));
KTable<String, History> historyWindowed = eventStreamRaw
...
.aggregate(historyInitializer, historyAggregator, Named.as("name"), store)
Java 编译抛出以下错误:
The method
aggregate(Initializer<VR>, Aggregator<? super String,? super Event,VR>, Named, Materialized<String,VR,WindowStore<Bytes,byte[]>>)
in the type TimeWindowedKStream<String,Event> is not applicable for the arguments
(Initializer<History>, Aggregator<String,Event,History>, Named, Materialized<String,History,WindowStore<Bytes,byte[]>>)
老实说,我不明白。参数正确;VR 类型是“历史”。
那么,你知道我错过了什么吗?
这个 windowedBy KTable 的想法是有一个状态,它可以为一个“事物”保存一天的所有事件。假设产生了一个新警报,我想将某一天的“事物”的所有事件附加到警报中。然后我会从 KStream Alert 到 KTable History 进行 leftJoin。这是将历史数据添加到 Kafka 事件的最佳方式吗?有没有办法“查找”KStream 事件的最后 x 天?我已经检查了 KStream Alert-KStream 事件 leftJoin 但这会为每个新的 KStream 事件产生一个输出。所以,从我的观点来看,这是不切实际的。
非常感谢您的帮助。我希望这只是一个简单的修复。高度赞赏!