我正在尝试使用 Spark Streaming 从 NFS 上的 CSV 文件中收集数据。我的代码非常简单,到目前为止我只在 spark-shell 中运行它,但即使在那里我也遇到了一些问题。
我正在运行带有 6 个工作人员的独立 Spark 主机的 spark-shell,并将以下参数传递给 spark-shell:
--master spark://master.host:7077 --num-executors 3 --conf spark.cores.max=10
这是代码:
val schema = spark.read.option("header", true).option("mode", "PERMISSIVE").csv("/nfs/files_to_collect/schema/schema.csv").schema
val data = spark.readStream.option("header", true).schema(schema).csv("/nfs/files_to_collect/jobs/jobs*")
val query = data.writeStream.format("console").start()
该 NFS 路径中有 2 个文件,每个文件大小约为 200MB。当我调用 writeStream 时,我收到以下警告:
“17/11/13 22:56:31 WARN TaskSetManager:第 2 阶段包含一个非常大的任务 (106402 KB)。建议的最大任务大小为 100 KB。”
查看 Spark 主 UI,我看到只使用了一个执行程序——创建了四个任务,每个任务读取每个 CSV 文件的约 50%。
我的问题是:
1) NFS 路径中的文件越多,驱动程序似乎需要的内存就越多 - 如果有 2 个文件,它会崩溃,直到我将其内存增加到 2g。4个文件需要不少于8g。驱动程序在做什么,它需要这么多内存?
2) 如何控制读取 CSV 文件的并行度?我注意到文件越多,创建的任务就越多,但是可以手动控制吗?