1

我正在尝试将数据从单一来源写入多个 DataSink(Mongo 和 Postgres DB)。传入数据

Dataset<Row> df = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("subscribe", "topic1")
        .load();

Dataset<Row> personalDetails = df.selectExpr("name", "id", "age");


personalDetails.writeStream()
    .outputMode(OutputMode.Update())
    .foreachBatch((dataframe, bachId) -> {
            dataframe.write().format("com.mongodb.spark.sql.DefaultSource").mode(SaveMode.Append)
                    .option("uri", "mongodb://localhost/employee")
                    .option("database", "employee")
                    .option("collection", "PI").save();
    }).start();

Dataset<Row> salDetails = df.selectExpr("basicSal", "bonus");
salDetails.writeStream()
    .outputMode(OutputMode.Update())
    .foreachBatch((dataframe, bachId) -> {
            dataframe.write().format("com.mongodb.spark.sql.DefaultSource").mode(SaveMode.Append)
                    .option("uri", "mongodb://localhost/employee")
                    .option("database", "employee")
                    .option("collection", "SAL").save();
    }).start();

问题是,我可以看到 Spark 正在打开两个 Streams 并两次读取相同的事件。是否可以读取一次并应用不同的转换并写入不同的集合?

4

2 回答 2

2

您应该缓存 DataFrame。见这里

写入多个位置 - 如果要将流式查询的输出写入多个位置,则可以简单地多次写入输出 DataFrame/Dataset。但是,每次写入尝试都可能导致重新计算输出数据(包括可能重新读取输入数据)。为避免重新计算,您应该缓存输出 DataFrame/Dataset,将其写入多个位置,然后取消缓存。

他们的例子:

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.persist()
  batchDF.write.format(...).save(...)  // location 1
  batchDF.write.format(...).save(...)  // location 2
  batchDF.unpersist()
}

您可以将所有代码放在一个中foreachBatch,并将数据帧写入您的 2 个接收器。您可以通过缓存数据帧并selectExpr在此缓存的数据帧上执行并保存它来做到这一点。

作为旁注- 请注意,在任何情况下,如果您想要“全有或全无”(即您不希望您写给 mongo 而不是 postgres 的情况),您必须只使用一个foreachBatch,否则(如果你有 2 foreachBatch,就像你的问题一样)你有 2 个独立的批次 - 对于相同的数据,一个可能会在另一个成功时失败。

于 2020-10-05T06:02:35.740 回答
0

最后,我可以使用Spark 3.1.1使用Structured Streaming Table API解决这个问题。

从 Spark 3.1 开始,您还可以使用 DataStreamReader.table() 将表读取为流数据帧,并使用 DataStreamWriter.toTable() 将流数据帧写入表:

    // Stream to myTable
    df.writeStream()
      .option("checkpointLocation", "/tmp/test")
      .toTable("myTable");
    // Stream from myTable
    Dataset<Row> tableDf = spark.readStream()
                                .table("myTable");

参考:Spark 3.1 表 API

有了这个,我可以解决从源“kafka”多次读取的问题。这样,它将只创建一个 Kafka 消费者并将数据流式传输到 Table。从那里它将使用多个流从表中读取并在表数据集之上应用额外的转换。所以完整的例子如下:

Dataset<Row> df = spark
        .readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("subscribe", "topic1")
        .load();

// Stream to myTable
df.writeStream.option("checkpointLocation", "c:/temp/test").toTable("myTable");

// Stream from myTable
Dataset<Row> tableDf = spark.readStream().table("myTable");

Dataset<Row> personalDetails = tableDf.selectExpr("name", "id", "age");


personalDetails.writeStream()
    .outputMode(OutputMode.Update())
    .foreachBatch((dataframe, bachId) -> {
            dataframe.write().format("com.mongodb.spark.sql.DefaultSource").mode(SaveMode.Append)
                    .option("uri", "mongodb://localhost/employee")
                    .option("database", "employee")
                    .option("collection", "PI").save();
    }).start();

Dataset<Row> salDetails = tableDf.selectExpr("basicSal", "bonus");
salDetails.writeStream()
    .outputMode(OutputMode.Update())
    .foreachBatch((dataframe, bachId) -> {
            dataframe.write().format("com.mongodb.spark.sql.DefaultSource").mode(SaveMode.Append)
                    .option("uri", "mongodb://localhost/employee")
                    .option("database", "employee")
                    .option("collection", "SAL").save();
    }).start();
于 2021-05-22T14:26:24.950 回答