我们在 Spark 中有一个用例,我们希望将历史数据从数据库加载到 Spark,并继续向 Spark 添加新的流数据,然后我们可以对整个最新数据集进行分析。
据我所知,Spark SQL 和 Spark Streaming 都不能将历史数据与流数据结合起来。然后我发现了 Spark 2.0 中的 Structured Streaming,它似乎是为这个问题而构建的。但是经过一些实验,我仍然无法弄清楚。这是我的代码:
SparkSession spark = SparkSession
.builder()
.config(conf)
.getOrCreate();
JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
// Load historical data from MongoDB
JavaMongoRDD<Document> mongordd = MongoSpark.load(jsc);
// Create typed dataset with customized schema
JavaRDD<JavaRecordForSingleTick> rdd = mongordd.flatMap(new FlatMapFunction<Document, JavaRecordForSingleTick>() {...});
Dataset<Row> df = spark.sqlContext().createDataFrame(rdd, JavaRecordForSingleTick.class);
Dataset<JavaRecordForSingleTick> df1 = df.as(ExpressionEncoder.javaBean(JavaRecordForSingleTick.class));
// ds listens to a streaming data source
Dataset<Row> ds = spark.readStream()
.format("socket")
.option("host", "127.0.0.1")
.option("port", 11111)
.load();
// Create the typed dataset with customized schema
Dataset<JavaRecordForSingleTick> ds1 = ds
.as(Encoders.STRING())
.flatMap(new FlatMapFunction<String, JavaRecordForSingleTick>() {
@Override
public Iterator<JavaRecordForSingleTick> call(String str) throws Exception {
...
}
}, ExpressionEncoder.javaBean(JavaRecordForSingleTick.class));
// ds1 and df1 have the same schema. ds1 gets data from the streaming data source, df1 is the dataset with historical data
ds1 = ds1.union(df1);
StreamingQuery query = ds1.writeStream().format("console").start();
query.awaitTermination();
我收到错误“org.apache.spark.sql.AnalysisException:不支持流和批处理 DataFrames/Datasets 之间的联合;” 当我联合()两个数据集时。
有人可以帮我吗?我会走错方向吗?