我刚刚了解了 SnappyData(并观看了一些有关它的视频),它看起来很有趣,主要是当说性能可能比常规 spark 作业快很多倍时。
以下代码(片段)能否利用 SnappyData 功能来提高作业的性能并提供相同的行为?
Dataset<EventData> ds = spark
.readStream()
.format("kafka")
(...)
.as(Encoders.bean(EventData.class));
KeyValueGroupedDataset<String, EventData> kvDataset = ds.groupByKey(new MapFunction<EventData, String>() {
public String call(EventData value) throws Exception {
return value.getId();
}
}, Encoders.STRING());
Dataset<EventData> processedDataset = kvDataset.mapGroupsWithState(new MapGroupsWithStateFunction<String, EventData, EventData, EventData>(){
public EventData call(String key, Iterator<EventData> values, GroupState<EventData> state) throws Exception {
/* state control code */
EventData processed = EventHandler.validate(key,values);
return processed;
}}, Encoders.bean(EventData.class), Encoders.bean(EventData.class));
StreamingQuery query = processedDataset.writeStream()
.outputMode("update")
.format("console")
.start();