5

我正在使用 spark sql 对我的数据集运行查询。查询的结果非常小,但仍然是分区的。

我想合并生成的 DataFrame 并按列对行进行排序。我试过了

DataFrame result = sparkSQLContext.sql("my sql").coalesce(1).orderBy("col1")
result.toJSON().saveAsTextFile("output")

我也试过

DataFrame result = sparkSQLContext.sql("my sql").repartition(1).orderBy("col1")
result.toJSON().saveAsTextFile("output")

输出文件按块排序(即分区是有序的,但数据帧没有作为一个整体排序)。例如,而不是

1, value
2, value
4, value
4, value
5, value
5, value
...

我明白了

2, value
4, value
5, value
-----------> partition boundary
1, value
4, value
5, value
  1. 获得查询结果绝对排序的正确方法是什么?
  2. 为什么不将数据框合并到单个分区中?
4

2 回答 2

3

我想在这里提到几件事。1-源代码显示 orderBy 语句在内部调用排序 api 并将全局排序设置为 true 。因此,在输出级别缺少排序表明在写入目标时排序丢失。我的观点是,对 orderBy 的调用总是需要全局顺序。

2-使用剧烈合并,例如在您的情况下强制使用单个分区,可能非常危险。我建议你不要这样做。源代码表明调用 coalesce(1) 可能会导致上游转换使用单个分区。这将是残酷的表现。

3-您似乎希望 orderBy 语句使用单个分区执行。我不认为我同意这种说法。这将使 Spark 成为一个非常愚蠢的分布式框架。

如果您同意或不同意声明,请告诉我社区。

无论如何,您如何从输出中收集数据?

也许输出实际上包含已排序的数据,但是您为了从输出中读取而执行的转换/操作是造成订单丢失的原因。

于 2016-01-20T23:09:15.027 回答
2

orderBy 将在您合并后生成新的分区。要拥有单个输出分区,请重新排序操作...

DataFrame result = spark.sql("my sql").orderBy("col1").coalesce(1)
result.write.json("results.json")

正如@JavaPlanet 所提到的,对于真正的大数据,您不想合并到一个分区中。它将大大降低您的并行度。

于 2018-03-04T09:14:19.583 回答