1

我刚刚了解了 SnappyData(并观看了一些有关它的视频),它看起来很有趣,主要是当说性能可能比常规 spark 作业快很多倍时。

以下代码(片段)能否利用 SnappyData 功能来提高作业的性能并提供相同的行为?

Dataset<EventData> ds = spark
  .readStream()
  .format("kafka")
  (...)
  .as(Encoders.bean(EventData.class)); 

KeyValueGroupedDataset<String, EventData> kvDataset = ds.groupByKey(new MapFunction<EventData, String>() {
  public String call(EventData value) throws Exception {
    return value.getId();
  }
}, Encoders.STRING());

Dataset<EventData> processedDataset = kvDataset.mapGroupsWithState(new MapGroupsWithStateFunction<String, EventData, EventData, EventData>(){
  public EventData call(String key, Iterator<EventData> values, GroupState<EventData> state) throws Exception {

    /* state control code */

    EventData processed = EventHandler.validate(key,values);

    return processed;

}}, Encoders.bean(EventData.class), Encoders.bean(EventData.class));

StreamingQuery query = processedDataset.writeStream()
  .outputMode("update")
  .format("console")
  .start();
4

1 回答 1

1

我怀疑 SnappyData 会优化这个管道。优化设计用于 DataFrames(托管内存表)和 GroupBy、Join、scan 等常见运算符。

在您的示例中,我会想象映射函数会主导处理时间。也许,可以将其转换Dataset<EventData>Dataset<Row>(使用 toDF()),将其存储在一个表中,使用内置的 spark-sql 运算符或 UDF,然后对其进行操作。这可能会显着改变摄取率。

在这个简单的示例中,您将输出到控制台。在现实世界中,我假设您将此状态摄取到某个商店中。这就是 SnappyData 可以发挥重要作用的地方。

于 2018-01-17T01:58:41.717 回答