我需要从单独的源目录读取 JSON 文件并为每个目录创建单独的表。我希望这可以并行完成,但 Spark 不支持嵌套的 RDD,所以目前它是按顺序执行的。有没有一个好的解决方案可以让这些目录并行读取/处理?
这是我正在尝试的示例片段,但由于嵌套的 RDD,它不起作用:
def readJsonCreateTable(tableInfo: (String, String)) {
val df = spark
.read
.json(tableInfo._1)
df.createOrReplaceTempView(tableInfo._2)
}
val dirList = List(("/mnt/jsondir1", "temptable1"),
("/mnt/jsondir2", "temptable2"),
("/mnt/jsondir3", "temptable3"))
val dirRDD = sc.parallelize(dirList)
dirRDD.foreach(readJsonCreateTable) // Nested RDD error
将最后一行更改为 dirRDD.collect.foreach 有效,但随后该工作未分发并按顺序执行,因此非常慢。
还尝试了 dirRDD.collect.par.foreach,但它只在驱动程序上运行并行线程并且不利用所有其他节点。
我查看了 foreachAsync,但由于嵌套,我不确定在这种情况下异步是否一定是并行的。
这是通过 Databricks 使用 Spark 2.0 和 Scala 2.11。
===========
补充:
我尝试了在 Spark 中返回 FutureAction 的 foreachAsync,但这也给出了错误。
import scala.concurrent._
import scala.concurrent.duration._
.
.
.
val dirFuture = dirRDD.foreachAsync(readJsonCreateTable)
Await.result(dirFuture, 1 second)
显然 SimpleFutureAction 不可序列化
org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.SimpleFutureAction