我正在尝试将 Spark 用于一个非常简单的用例:给定大量文件(90k),其中包含数百万个设备的设备时间序列数据,将给定设备的所有时间序列读取分组到一组文件中(分割)。现在假设我们的目标是 100 个分区,给定的设备数据显示在同一个输出文件中并不重要,只是同一个分区。
鉴于这个问题,我们提出了两种方法来做到这一点 - repartition
thenwrite
或write
withpartitionBy
应用于Writer
. 其中任何一个的代码都非常简单:
repartition
(添加了哈希列以确保与partitionBy
以下代码的比较是一对一的):
df = spark.read.format("xml") \
.options(rowTag="DeviceData") \
.load(file_path, schema=meter_data) \
.withColumn("partition", hash(col("_DeviceName")).cast("Long") % num_partitions) \
.repartition("partition") \
.write.format("json") \
.option("codec", "org.apache.hadoop.io.compress.GzipCodec") \
.mode("overwrite") \
.save(output_path)
partitionBy
:
df = spark.read.format("xml") \
.options(rowTag="DeviceData") \
.load(file_path, schema=meter_data) \
.withColumn("partition", hash(col("_DeviceName")).cast("Long") % num_partitions) \
.write.format("json") \
.partitionBy(“partition”) \
.option("codec", "org.apache.hadoop.io.compress.GzipCodec") \
.mode("overwrite") \
.save(output_path)
在我们的测试repartition
中比partitionBy
. 为什么是这样?
根据我的理解repartition
,我的 Spark 学习告诉我要尽可能避免这种洗牌。另一方面,partitionBy
(根据我的理解)只对每个节点产生本地排序操作 - 不需要洗牌。我是否误解了一些让我认为partitionBy
会更快的东西?