9

我对numPartitions以下方法中参数的行为感到困惑:

  1. DataFrameReader.jdbc
  2. Dataset.repartition

关于参数的官方文档如下DataFrameReader.jdbcnumPartitions

numPartitions:分区数。这与 lowerBound(包括)、upperBound(不包括)一起形成了用于生成的 WHERE 子句表达式的分区步长,这些表达式用于均匀地拆分列 columnName。

官方文档Dataset.repartition

返回具有完全numPartitions分区的新数据集。


我目前的理解:

  1. 方法中的numPartition参数DataFrameReader.jdbc控制从数据库中读取数据的并行度
  2. 中的numPartition参数Dataset.repartition控制将其写入磁盘时将生成的输出文件的数量DataFrame

我的问题:

  1. 如果我DataFrame通过读取DataFrameReader.jdbc然后将其写入磁盘(不调用repartition方法),那么输出中的文件是否仍然与我DataFrame在调用磁盘后将其写入磁盘一样多repartition
  2. 如果上述问题的答案是:
    • 是的:那么在使用方法(带参数)读取repartition的方法上调用方法是多余的吗?DataFrameDataFrameReader.jdbcnumPartitions
    • 否:那么请纠正我理解的失误。同样在这种情况下,方法的numPartitions参数不应该DataFrameReader.jdbc被称为“并行”之类的东西吗?
4

1 回答 1

13

简短回答:两种方法中参数的行为(几乎)没有区别numPartitions


read.jdbc(..numPartitions..)

在这里,numPartitions参数控制:

  1. 将与MySQL(或任何其他RDBM)建立的并行连接数以将数据读DataFrame
  2. 读取所有后续操作的并行度,DataFrame包括写入磁盘,直到repartition在其上调用方法

repartition(..numPartitions..)

此处numPartitions的参数控制在执行 的任何操作(包括写入磁盘)时将展示的并行度DataFrame


所以基本上使用方法DataFrame在读取MySQL表上获得的spark.read.jdbc(..numPartitions..)行为相同(在对其执行的操作中表现出相同程度的并行性),就好像它是在没有并行性的情况下读取的,然后在其上调用该方法(显然具有相同的值)repartition(..numPartitions..)numPartitions


要回答确切的问题:

如果我通过 DataFrameReader.jdbc 读取 DataFrame 然后将其写入磁盘(不调用 repartition 方法),那么输出中的文件是否仍然与我在调用 repartition 后将 DataFrame 写入磁盘时一样多它?

是的

假设读取任务已通过提供适当的参数 ( , , & )进行了并行化,则对结果的所有操作(包括写入)都将并行执行。在这里引用官方文档columnNamelowerBoundupperBoundnumPartitionsDataFrame

numPartitions:表读写中可用于并行的最大分区数。这也决定了并发 JDBC 连接的最大数量。如果要写入的分区数超过此限制,我们会在写入前通过调用 coalesce(numPartitions) 将其减少到此限制。


是的:那么在使用 DataFrameReader.jdbc 方法(带有 numPartitions 参数)读取的 DataFrame 上调用 repartition 方法是否多余?

是的

除非您调用方法的其他变体repartition(采用columnExprs参数的方法),否则调用repartition这样的DataFrame(具有相同的numPartitions)参数是多余的。但是,我不确定在已经并行化的对象上强制使用相同程度的并行性是否也会导致不必要的数据混洗。一旦我遇到它,将更新答案。 DataFrameexecutors

于 2018-02-22T11:17:24.160 回答