我想知道在使用spark-csv导入源数据时如何在 Spark 中强制使用后续的、更适当分区的 DataFrame 。
概括:
spark-csv
似乎不支持像sc.textFile()
那样在导入时显式分区。- 虽然它“免费”为我提供了推断模式,但默认情况下,当我在集群中使用 8 个执行程序时,我得到的返回的 DataFrame 通常只有 2 个分区。
- 即使具有更多分区的后续 DataFrame 被缓存
cache()
并用于进一步处理(在导入源文件后立即),Spark 作业历史仍然显示出令人难以置信的任务分布偏差 - 2 个执行程序将拥有绝大多数任务而不是我期望的更均匀的分布。
不能发布数据,但代码只是一些简单的连接,通过添加几列.withColumn()
,然后通过非常基本的线性回归spark.mlib
。
下面是来自 Spark History UI 的比较图像,显示了每个执行程序的任务(最后一行是驱动程序)。
注意:无论是否调用DataFrame repartition()
,我都会得到相同的倾斜任务分布。spark-csv
我如何“强制”Spark 基本上忘记那些初始 DataFrames 并从更适当的分区 DataFrames 开始,或者强制 spark-csv 以某种方式对它的 DataFrames 进行不同的分区(不分叉/修改它的源)?
我可以使用 解决这个问题sc.textFile(file, minPartitions)
,但我希望我不必求助于它,因为它spark-csv
提供的类型很好的模式之类的东西。