我在 3 个节点上有一个分布式系统,我的数据分布在这些节点之间。例如,我有一个test.csv
文件存在于所有 3 个节点上,它包含 4 列
row | id, C1, C2, C3
----------------------
row1 | A1 , c1 , c2 ,2
row2 | A1 , c1 , c2 ,1
row3 | A1 , c11, c2 ,1
row4 | A2 , c1 , c2 ,1
row5 | A2 , c1 , c2 ,1
row6 | A2 , c11, c2 ,1
row7 | A2 , c11, c21,1
row8 | A3 , c1 , c2 ,1
row9 | A3 , c1 , c2 ,2
row10 | A4 , c1 , c2 ,1
我想尝试聚合上述结果集。如何按id
、c1
、c2
和c3
列聚合数据集并像这样输出?
row | id, C1, C2, C3
----------------------
row1 | A1 , c1 , c2 ,3
row2 | A1 , c11, c2 ,1
row3 | A2 , c1 , c2 ,2
row4 | A2 , c11, c2 ,1
row5 | A2 , c11, c21,1
row6 | A3 , c1 , c2 ,3
row7 | A4 , c1 , c2 ,1
我尝试了以下方法:
from array import array
from datetime import datetime
import pyspark.sql
from pyspark.sql import Row, SQLContext, StructField, StringType, IntegerType
schema = StructType([
StructField("id", StringType(), False),
StructField("C1", StringType(), False),
StructField("C2", StringType(), False),
StructField("C3", IntegerType(), False)])
base_rdd = sc.textFile("/home/hduser/spark-1.1.0/Data/test.tsv").map(lambda l:
l.split(",")
rdd = base_rdd.map(lambda x: Row(id = x[0], C1 = x[1], C2 = x[2], C3 = int(x[3])))
sqlContext = SQLContext(sc)
srdd = sqlContext.inferSchema(rdd)