1

我在更大的堆叠数据集上尝试了 1.6 的新“枢轴”功能。它有 5,656,458 行,IndicatorCode列有 1344 个不同的代码。

这个想法是使用 pivot 来“unstack”(用 pandas 术语)这个数据集,并为每个 IndicatorCode 设置一个列。

schema = StructType([ \
   StructField("CountryName", StringType(), True), \
   StructField("CountryCode", StringType(), True), \
   StructField("IndicatorName", StringType(), True), \
   StructField("IndicatorCode", StringType(), True), \
   StructField("Year", IntegerType(), True), \
   StructField("Value", DoubleType(), True)  \
])

data = sqlContext.read.load('hdfs://localhost:9000/tmp/world-development-indicators/Indicators.csv', 
                            format='com.databricks.spark.csv', 
                            header='true', 
                            schema=schema)

data2 = indicators_csv.withColumn("IndicatorCode2", regexp_replace("indicatorCode", "\.", "_"))\
                      .select(["CountryCode", "IndicatorCode2", "Year", "Value"])

columns = [row.IndicatorCode2 for row in data2.select("IndicatorCode2").distinct().collect()]

data3 = data2.groupBy(["Year", "CountryCode"])\
             .pivot("IndicatorCode2", columns)\
             .max("Value")

虽然这成功返回,但data3.first()从未返回结果(我在 10 分钟后使用 3 个核心在我的独立设备上中断)。

我的方法使用RDD并且aggregateByKey效果很好,所以我不是在寻找关于如何做到这一点的解决方案,而是使用 DataFrames 进行枢轴是否也可以解决问题。

4

2 回答 2

3

好吧,旋转通常不是一个非常有效的操作,使用DataFrameAPI 也无能为力。您可以尝试的一件事是repartition您的数据:

(data2
  .repartition("Year", "CountryCode")
  .groupBy("Year", "CountryCode")
  .pivot("IndicatorCode2", columns)
  .max("Value"))

甚至聚合:

from pyspark.sql.functions import max

(df
    .groupBy("Year", "CountryCode", "IndicatorCode")
    .agg(max("Value").alias("Value"))
    .groupBy("Year", "CountryCode")
    .pivot("IndicatorCode", columns)
    .max("Value"))

申请前pivot。两种解决方案背后的想法是相同的。而不是移动大扩展Rows移动狭窄密集数据并在本地扩展。

于 2016-02-16T16:19:43.950 回答
1

Spark 2.0 引入了SPARK-13​​749一种枢轴实现,对于大量枢轴列值来说速度更快。

在我的计算机上使用 Spark 2.1.0 进行测试,您的示例现在可以在 48 秒内运行。

于 2017-03-10T16:45:47.247 回答