6

我一直在玩 Spark,我设法让它来处理我的数据。我的数据由平面分隔的文本文件组成,由 50 列和大约 2000 万行组成。我有可以处理每一列的 scala 脚本。

在并行处理方面,我知道 RDD 操作在多个节点上运行。因此,每次我处理一列时,它们都会并行处理,但列本身是按顺序处理的。

一个简单的例子:如果我的数据是 5 列文本分隔文件并且每列包含文本,我想对每列进行字数统计。我会做:

for(i <- 0 until 4){
   data.map(_.split("\t",-1)(i)).map((_,1)).reduce(_+_)
}

尽管每列的操作是并行运行的,但列本身是按顺序处理的(我知道的措辞不好。对不起!)。换句话说,在第 1 列完成后处理第 2 列。第 3 列在第 1 列和第 2 列完成后处理,依此类推。

我的问题是:有没有一次处理多个列?如果你知道一种方法,cor一个教程,你介意与我分享吗?

谢谢你!!

4

2 回答 2

3

假设输入是seq。可以执行以下操作以同时处理列。基本思想是使用序列(列,输入)作为键。

scala> val rdd = sc.parallelize((1 to 4).map(x=>Seq("x_0", "x_1", "x_2", "x_3")))
rdd: org.apache.spark.rdd.RDD[Seq[String]] = ParallelCollectionRDD[26] at parallelize at <console>:12

scala> val rdd1 = rdd.flatMap{x=>{(0 to x.size - 1).map(idx=>(idx, x(idx)))}}
rdd1: org.apache.spark.rdd.RDD[(Int, String)] = FlatMappedRDD[27] at flatMap at <console>:14

scala> val rdd2 = rdd1.map(x=>(x, 1))
rdd2: org.apache.spark.rdd.RDD[((Int, String), Int)] = MappedRDD[28] at map at <console>:16

scala> val rdd3 = rdd2.reduceByKey(_+_)
rdd3: org.apache.spark.rdd.RDD[((Int, String), Int)] = ShuffledRDD[29] at reduceByKey at <console>:18

scala> rdd3.take(4)
res22: Array[((Int, String), Int)] = Array(((0,x_0),4), ((3,x_3),4), ((2,x_2),4), ((1,x_1),4))

示例输出:((0, x_0), 4) 表示第一列,key为x_0,value为4。可以从这里开始进一步处理。

于 2014-10-11T05:41:28.003 回答
1

您可以尝试以下代码,它使用 scala 并行化集合功能,

(0 until 4).map(index => (index,data)).par.map(x => {
    x._2.map(_.split("\t",-1)(x._1)).map((_,1)).reduce(_+_)
}

数据是一个参考,所以复制数据不会花费太多。而且 rdd 是只读的,所以可以并行处理。par方法使用并行收集功能。您可以在 spark Web UI 上检查并行作业。

于 2015-04-25T13:12:46.623 回答