我在更大的堆叠数据集上尝试了 1.6 的新“枢轴”功能。它有 5,656,458 行,IndicatorCode
列有 1344 个不同的代码。
这个想法是使用 pivot 来“unstack”(用 pandas 术语)这个数据集,并为每个 IndicatorCode 设置一个列。
schema = StructType([ \
StructField("CountryName", StringType(), True), \
StructField("CountryCode", StringType(), True), \
StructField("IndicatorName", StringType(), True), \
StructField("IndicatorCode", StringType(), True), \
StructField("Year", IntegerType(), True), \
StructField("Value", DoubleType(), True) \
])
data = sqlContext.read.load('hdfs://localhost:9000/tmp/world-development-indicators/Indicators.csv',
format='com.databricks.spark.csv',
header='true',
schema=schema)
data2 = indicators_csv.withColumn("IndicatorCode2", regexp_replace("indicatorCode", "\.", "_"))\
.select(["CountryCode", "IndicatorCode2", "Year", "Value"])
columns = [row.IndicatorCode2 for row in data2.select("IndicatorCode2").distinct().collect()]
data3 = data2.groupBy(["Year", "CountryCode"])\
.pivot("IndicatorCode2", columns)\
.max("Value")
虽然这成功返回,但data3.first()
从未返回结果(我在 10 分钟后使用 3 个核心在我的独立设备上中断)。
我的方法使用RDD
并且aggregateByKey
效果很好,所以我不是在寻找关于如何做到这一点的解决方案,而是使用 DataFrames 进行枢轴是否也可以解决问题。