1

我在 3 个节点上有一个分布式系统,我的数据分布在这些节点之间。例如,我有一个test.csv文件存在于所有 3 个节点上,它包含 4 列

row   | id,  C1, C2,  C3
----------------------
row1  | A1 , c1 , c2 ,2
row2  | A1 , c1 , c2 ,1 
row3  | A1 , c11, c2 ,1 
row4  | A2 , c1 , c2 ,1 
row5  | A2 , c1 , c2 ,1 
row6  | A2 , c11, c2 ,1 
row7  | A2 , c11, c21,1 
row8  | A3 , c1 , c2 ,1
row9  | A3 , c1 , c2 ,2
row10 | A4 , c1 , c2 ,1

我想尝试聚合上述结果集。如何按idc1c2c3列聚合数据集并像这样输出?

row   | id,  C1, C2,  C3
----------------------
row1  | A1 , c1 , c2 ,3
row2  | A1 , c11, c2 ,1 
row3  | A2 , c1 , c2 ,2 
row4  | A2 , c11, c2 ,1 
row5  | A2 , c11, c21,1 
row6  | A3 , c1 , c2 ,3
row7  | A4 , c1 , c2 ,1

我尝试了以下方法:

from array import array 
from datetime import datetime 
import pyspark.sql 
from pyspark.sql import Row, SQLContext, StructField, StringType,  IntegerType

schema = StructType([
    StructField("id", StringType(), False),
    StructField("C1", StringType(), False), 
    StructField("C2", StringType(), False),
    StructField("C3", IntegerType(), False)])
base_rdd = sc.textFile("/home/hduser/spark-1.1.0/Data/test.tsv").map(lambda l: 

l.split(",")

rdd = base_rdd.map(lambda x: Row(id = x[0], C1 = x[1], C2 = x[2], C3 = int(x[3])))
sqlContext = SQLContext(sc)
srdd = sqlContext.inferSchema(rdd)
4

2 回答 2

1

要解决您的问题,您可以执行以下步骤。我不知道python步骤,下面是java步骤。我希望你能把它和python联系起来。

  1. 读取 csv 文件

JavaRDD<String> input = sc.textFile(args[0]);

  1. 从文件中创建一对 rdd

    JavaPairRDD<Integer,String> pairMap = input.mapToPair( new PairFunction<String, Integer, String>() { @Override public Tuple2<Integer, String> call(String line) throws Exception { String[] s = line.split(","); String key = s[0]+'#'+s[1]+'#' +s[2];// id,c1,c2 Integer value = Integer.valueOf(s[3]) //c3
    return new Tuple2<Integer,String>(key, value); } });

  2. 按键缩小地图

JavaPairRDD<String,Integer> result = pairMap.reduceByKey( new Function2<Integer, Integer, Integer>() {
@Override public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } });

  1. resultobject 包含您的预期结果,其中 key 是id+'#'+c1+'#'+c2并且 value 是聚合的c3。您可以根据需要进一步使用此地图。您可以标记密钥#以取回您的列,并可以使用 apache-spark-sql 插入表中。

我希望这有帮助。

于 2015-06-16T05:20:13.010 回答
1

首先,我建议使用“com.databricks.spark.csv”来读取 csv 文件(运行 pyspark shell http://spark-packages.org/package/databricks时需要使用“--package”加载它/spark-csv)。然后使用 groupBy 方法:

df = (sqlContext.read
  .format('com.databricks.spark.csv')
  .option("inferSchema", "true")
  .option("header", "true")
  .load("<your_file>.csv"))

df2= df.groupBy('id', 'C1', 'C2').agg({'C3': 'sum'}).sort('id', 'C1')

df.show()
+---+---+---+---+
| id| C1| C2| C3|
+---+---+---+---+
| A1| c1| c2|  2|
| A1| c1| c2|  1|
| A1|c11| c2|  1|
| A2| c1| c2|  1|
| A2| c1| c2|  1|
| A2|c11| c2|  1|
| A2|c11|c21|  1|
| A3| c1| c2|  1|
| A3| c1| c2|  2|
| A4| c1| c2|  1|
+---+---+---+---+

df2.show()

+---+---+---+-------+
| id| C1| C2|sum(C3)|
+---+---+---+-------+
| A1| c1| c2|      3|
| A1|c11| c2|      1|
| A2| c1| c2|      2|
| A2|c11| c2|      1|
| A2|c11|c21|      1|
| A3| c1| c2|      3|
| A4| c1| c2|      1|
+---+---+---+-------+

如果标签“行”很重要,您可以稍后添加并将“总和(C3)”重命名为“C3”。有关更多信息,请查看 Spark Python API https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame

于 2015-11-11T15:55:09.950 回答