我正在尝试将数据从单一来源写入多个 DataSink(Mongo 和 Postgres DB)。传入数据
Dataset<Row> df = spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic1")
.load();
Dataset<Row> personalDetails = df.selectExpr("name", "id", "age");
personalDetails.writeStream()
.outputMode(OutputMode.Update())
.foreachBatch((dataframe, bachId) -> {
dataframe.write().format("com.mongodb.spark.sql.DefaultSource").mode(SaveMode.Append)
.option("uri", "mongodb://localhost/employee")
.option("database", "employee")
.option("collection", "PI").save();
}).start();
Dataset<Row> salDetails = df.selectExpr("basicSal", "bonus");
salDetails.writeStream()
.outputMode(OutputMode.Update())
.foreachBatch((dataframe, bachId) -> {
dataframe.write().format("com.mongodb.spark.sql.DefaultSource").mode(SaveMode.Append)
.option("uri", "mongodb://localhost/employee")
.option("database", "employee")
.option("collection", "SAL").save();
}).start();
问题是,我可以看到 Spark 正在打开两个 Streams 并两次读取相同的事件。是否可以读取一次并应用不同的转换并写入不同的集合?