20

我是 spark 新手,我想使用 group-by 和 reduce 从 CSV 中找到以下内容(一行):

  Department, Designation, costToCompany, State
  Sales, Trainee, 12000, UP
  Sales, Lead, 32000, AP
  Sales, Lead, 32000, LA
  Sales, Lead, 32000, TN
  Sales, Lead, 32000, AP
  Sales, Lead, 32000, TN 
  Sales, Lead, 32000, LA
  Sales, Lead, 32000, LA
  Marketing, Associate, 18000, TN
  Marketing, Associate, 18000, TN
  HR, Manager, 58000, TN

我想简化关于按部门、名称、状态分组的关于 CSV的附加列,其中包含sum(costToCompany)TotalEmployeeCount

应该得到如下结果:

  Dept, Desg, state, empCount, totalCost
  Sales,Lead,AP,2,64000
  Sales,Lead,LA,3,96000  
  Sales,Lead,TN,2,64000

有没有办法使用转换和操作来实现这一点。还是我们应该进行 RDD 操作?

4

4 回答 4

40

程序

  • 创建一个类(Schema)来封装您的结构(方法 B 不需要它,但如果您使用 Java,它会使您的代码更易于阅读)

    public class Record implements Serializable {
      String department;
      String designation;
      long costToCompany;
      String state;
      // constructor , getters and setters  
    }
    
  • 加载 CVS (JSON) 文件

    JavaSparkContext sc;
    JavaRDD<String> data = sc.textFile("path/input.csv");
    //JavaSQLContext sqlContext = new JavaSQLContext(sc); // For previous versions 
    SQLContext sqlContext = new SQLContext(sc); // In Spark 1.3 the Java API and Scala API have been unified
    
    
    JavaRDD<Record> rdd_records = sc.textFile(data).map(
      new Function<String, Record>() {
          public Record call(String line) throws Exception {
             // Here you can use JSON
             // Gson gson = new Gson();
             // gson.fromJson(line, Record.class);
             String[] fields = line.split(",");
             Record sd = new Record(fields[0], fields[1], fields[2].trim(), fields[3]);
             return sd;
          }
    });
    

此时您有两种方法:

A. SparkSQL

  • 注册一个表(使用您定义的模式类)

    JavaSchemaRDD table = sqlContext.applySchema(rdd_records, Record.class);
    table.registerAsTable("record_table");
    table.printSchema();
    
  • 使用所需的 Query-group-by 查询表

    JavaSchemaRDD res = sqlContext.sql("
      select department,designation,state,sum(costToCompany),count(*) 
      from record_table 
      group by department,designation,state
    ");
    
  • 在这里,您还可以使用 SQL 方法执行您想要的任何其他查询

B. 火花

  • 使用复合键映射:Department, Designation,State

    JavaPairRDD<String, Tuple2<Long, Integer>> records_JPRDD = 
    rdd_records.mapToPair(new
      PairFunction<Record, String, Tuple2<Long, Integer>>(){
        public Tuple2<String, Tuple2<Long, Integer>> call(Record record){
          Tuple2<String, Tuple2<Long, Integer>> t2 = 
          new Tuple2<String, Tuple2<Long,Integer>>(
            record.Department + record.Designation + record.State,
            new Tuple2<Long, Integer>(record.costToCompany,1)
          );
          return t2;
    }
    

    });

  • reduceByKey 使用复合键,求和costToCompany列,按键累加记录数

    JavaPairRDD<String, Tuple2<Long, Integer>> final_rdd_records = 
     records_JPRDD.reduceByKey(new Function2<Tuple2<Long, Integer>, Tuple2<Long,
     Integer>, Tuple2<Long, Integer>>() {
        public Tuple2<Long, Integer> call(Tuple2<Long, Integer> v1,
        Tuple2<Long, Integer> v2) throws Exception {
            return new Tuple2<Long, Integer>(v1._1 + v2._1, v1._2+ v2._2);
        }
    });
    
于 2014-08-18T15:33:16.567 回答
25

CSV 文件可以使用 Spark 内置的 CSV 阅读器进行解析。它将在成功读取文件时返回 DataFrame/DataSet。在 DataFrame/DataSet 之上,您可以轻松应用类似 SQL 的操作。

在 Java 中使用 Spark 2.x(及更高版本)

又名创建 SparkSession 对象spark

import org.apache.spark.sql.SparkSession;

SparkSession spark = SparkSession
    .builder()
    .appName("Java Spark SQL Example")
    .getOrCreate();

为行创建架构StructType

import org.apache.spark.sql.types.StructType;

StructType schema = new StructType()
    .add("department", "string")
    .add("designation", "string")
    .add("ctc", "long")
    .add("state", "string");

从 CSV 文件创建数据框并将架构应用到它

Dataset<Row> df = spark.read()
    .option("mode", "DROPMALFORMED")
    .schema(schema)
    .csv("hdfs://path/input.csv");

从 CSV 文件读取数据的更多选项

现在我们可以通过两种方式聚合数据

1.SQL方式

在 spark sql metastore 中注册一个表来执行 SQL 操作

df.createOrReplaceTempView("employee");

在注册的数据框上运行 SQL 查询

Dataset<Row> sqlResult = spark.sql(
    "SELECT department, designation, state, SUM(ctc), COUNT(department)" 
        + " FROM employee GROUP BY department, designation, state");

sqlResult.show(); //for testing

我们甚至可以直接在 CSV 文件上执行 SQL,而无需使用 Spark SQL 创建表


2.对象链接或编程或类似Java的方式

对 sql 函数进行必要的导入

import static org.apache.spark.sql.functions.count;
import static org.apache.spark.sql.functions.sum;

使用groupByagg在数据框/数据集上执行countsum数据

Dataset<Row> dfResult = df.groupBy("department", "designation", "state")
    .agg(sum("ctc"), count("department"));
// After Spark 1.6 columns mentioned in group by will be added to result by default

dfResult.show();//for testing

依赖库

"org.apache.spark" % "spark-core_2.11" % "2.0.0" 
"org.apache.spark" % "spark-sql_2.11" % "2.0.0"
于 2017-07-03T15:48:50.963 回答
4

以下内容可能不完全正确,但它应该让您了解如何处理数据。它不漂亮,应该用案例类等替换,但作为如何使用 spark api 的快速示例,我希望它就足够了 :)

val rawlines = sc.textfile("hdfs://.../*.csv")
case class Employee(dep: String, des: String, cost: Double, state: String)
val employees = rawlines
  .map(_.split(",") /*or use a proper CSV parser*/
  .map( Employee(row(0), row(1), row(2), row(3) )

# the 1 is the amount of employees (which is obviously 1 per line)
val keyVals = employees.map( em => (em.dep, em.des, em.state), (1 , em.cost))

val results = keyVals.reduceByKey{ a,b =>
    (a._1 + b._1, b._1, b._2) # (a.count + b.count , a.cost + b.cost )
}

#debug output
results.take(100).foreach(println)

results
  .map( keyval => someThingToFormatAsCsvStringOrWhatever )
  .saveAsTextFile("hdfs://.../results")

或者你可以使用 SparkSQL:

val sqlContext = new SQLContext(sparkContext)

# case classes can easily be registered as tables
employees.registerAsTable("employees")

val results = sqlContext.sql("""select dep, des, state, sum(cost), count(*) 
  from employees 
  group by dep,des,state"""
于 2014-08-18T15:04:03.313 回答
4

对于 JSON,如果您的文本文件每行包含一个 JSON 对象,您可以使用sqlContext.jsonFile(path)让 Spark SQL 将其加载为一个SchemaRDD(将自动推断架构)。然后,您可以将其注册为表并使用 SQL 进行查询。您还可以手动将文本文件加载为RDD[String]每条记录包含一个 JSON 对象,并将sqlContext.jsonRDD(rdd)其转换为SchemaRDD. jsonRDD当您需要预处理数据时很有用。

于 2014-08-20T21:01:36.893 回答