2

我需要从单独的源目录读取 JSON 文件并为每个目录创建单独的表。我希望这可以并行完成,但 Spark 不支持嵌套的 RDD,所以目前它是按顺序执行的。有没有一个好的解决方案可以让这些目录并行读取/处理?

这是我正在尝试的示例片段,但由于嵌套的 RDD,它不起作用:

def readJsonCreateTable(tableInfo: (String, String)) {
  val df = spark
           .read
           .json(tableInfo._1)
  df.createOrReplaceTempView(tableInfo._2)
}

val dirList = List(("/mnt/jsondir1", "temptable1"),
                   ("/mnt/jsondir2", "temptable2"),
                   ("/mnt/jsondir3", "temptable3"))
val dirRDD = sc.parallelize(dirList)
dirRDD.foreach(readJsonCreateTable) // Nested RDD error

将最后一行更改为 dirRDD.collect.foreach 有效,但随后该工作未分发并按顺序执行,因此非常慢。

还尝试了 dirRDD.collect.par.foreach,但它只在驱动程序上运行并行线程并且不利用所有其他节点。

我查看了 foreachAsync,但由于嵌套,我不确定在这种情况下异步是否一定是并行的。

这是通过 Databricks 使用 Spark 2.0 和 Scala 2.11。

===========
补充:

我尝试了在 Spark 中返回 FutureAction 的 foreachAsync,但这也给出了错误。

import scala.concurrent._
import scala.concurrent.duration._
.
.
.
val dirFuture = dirRDD.foreachAsync(readJsonCreateTable)
Await.result(dirFuture, 1 second)

显然 SimpleFutureAction 不可序列化

org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.SimpleFutureAction
4

1 回答 1

5

您可以使用 Scala并行集合期货来并行化在 Spark 驱动程序上运行的代码。Spark 驱动程序是线程安全的,因此可以按预期工作。

这是一个使用带有明确指定的线程池的并行集合的示例:

val dirList = List(
  ("dbfs:/databricks-datasets/flights/departuredelays.csv", "departuredelays"),
  ("dbfs:/databricks-datasets/amazon/users/", "users")
).par

val pool = new scala.concurrent.forkjoin.ForkJoinPool(2)

try {
  dirList.tasksupport = new scala.collection.parallel.ForkJoinTaskSupport(pool)
  dirList.foreach { case (filename, tableName) =>
    println(s"Starting to create table for $tableName")
    val df = spark.read.json(filename)
    println(s"Done creating table for $tableName")
    df.createOrReplaceTempView(tableName)
  }
} finally {
  pool.shutdown() // to prevent thread leaks.
  // You could also re-use thread pools across collections.
}

当我在 Databricks 中运行它时,它产生了流式日志输出,表明两个表正在并行加载:

Starting to create table for departuredelays
Starting to create table for users
Done creating table for departuredelays
Done creating table for users

这种并行性也反映在 Spark UI 的作业时间线视图上。

当然,您也可以为此使用 Java 线程。简而言之,从多个线程调用 Spark 驱动程序 API 是安全的,因此请选择您选择的 JVM 并发框架并对 Spark 驱动程序发出并行调用以创建您的表。

于 2017-01-12T02:32:54.213 回答