49

我正在编写一个 Spark 应用程序,并希望将一组键值对组合(K, V1), (K, V2), ..., (K, Vn)成一个键多值对(K, [V1, V2, ..., Vn])。我觉得我应该能够使用reduceByKey具有某种风味的功能来做到这一点:

My_KMV = My_KV.reduce(lambda a, b: a.append([b]))

发生这种情况时我得到的错误是:

“NoneType”对象没有“附加”属性。

我的键是整数,值 V1,...,Vn 是元组。我的目标是使用键和值列表(元组)创建一对。

4

10 回答 10

58

Map 和 ReduceByKey

输入类型和输出类型reduce必须相同,因此如果你想聚合一个列表,你必须map输入到列表。之后,您将这些列表合并为一个列表。

组合列表

您需要一种将列表组合成一个列表的方法。Python 提供了一些组合列表的方法

append修改第一个列表并将始终返回None.

x = [1, 2, 3]
x.append([4, 5])
# x is [1, 2, 3, [4, 5]]

extend做同样的事情,但解开列表:

x = [1, 2, 3]
x.extend([4, 5])
# x is [1, 2, 3, 4, 5]

两种方法都返回None,但您需要一个返回组合列表的方法,因此只需使用加号

x = [1, 2, 3] + [4, 5]
# x is [1, 2, 3, 4, 5]

火花

file = spark.textFile("hdfs://...")
counts = file.flatMap(lambda line: line.split(" ")) \
         .map(lambda actor: (actor.split(",")[0], actor)) \ 

         # transform each value into a list
         .map(lambda nameTuple: (nameTuple[0], [ nameTuple[1] ])) \

         # combine lists: ([1,2,3] + [4,5]) becomes [1,2,3,4,5]
         .reduceByKey(lambda a, b: a + b)

组合键

也可以用 来解决这个问题combineByKey,它在内部用于实现reduceByKey,但它更复杂,并且“使用 Spark 中专门的每个键组合器可以更快”。对于上面的解决方案,您的用例很简单。

GroupByKey

也可以使用 来解决这个问题groupByKey但它会降低并行化,因此对于大数据集可能会慢得多。

于 2014-11-18T19:43:45.373 回答
19

tl;博士如果您真的需要像@MariusIongroupByKey 建议的那样使用这种操作。与直接分组相比,这里提出的所有其他解决方案要么直接效率低下,要么至少次优。

reduceByKey使用列表连接不是可接受的解决方案,因为:

  • 需要初始化O(N)列表。
  • +对一对列表的每次应用都需要两个列表的完整副本 ( O(N) ),从而有效地将整体复杂度增加到O(N 2 )
  • 不解决任何由groupByKey. 必须洗牌的数据量以及最终结构的大小是相同的。
  • 其中一个答案所建议的reduceByKey不同,使用和实现之间的并行度没有区别groupByKey

combineByKeywithlist.extend是次优解决方案,因为:

  • 在其中创建O(N)列表对象MergeValue(这可以通过list.append直接在新项目上使用来优化)。
  • 如果使用它进行优化,list.append则完全等同于 a 的旧 (Spark <= 1.3) 实现,groupByKey并忽略 SPARK-3074 引入的所有优化,这些优化支持对大于内存的结构进行外部(磁盘上)分组。
于 2016-09-04T10:59:36.407 回答
15

我的谈话有点晚了,但这是我的建议:

>>> foo = sc.parallelize([(1, ('a','b')), (2, ('c','d')), (1, ('x','y'))])
>>> foo.map(lambda (x,y): (x, [y])).reduceByKey(lambda p,q: p+q).collect()
[(1, [('a', 'b'), ('x', 'y')]), (2, [('c', 'd')])]
于 2015-01-28T20:31:41.887 回答
10

您可以使用 RDD groupByKey方法。

输入:

data = [(1, 'a'), (1, 'b'), (2, 'c'), (2, 'd'), (2, 'e'), (3, 'f')]
rdd = sc.parallelize(data)
result = rdd.groupByKey().collect()

输出:

[(1, ['a', 'b']), (2, ['c', 'd', 'e']), (3, ['f'])]
于 2015-04-24T10:42:41.013 回答
3

如果你想做一个 reduceByKey,其中减少的 KV 对中的类型与原始 KV 对中的类型不同,那么可以使用该函数combineByKey。该函数所做的是获取 KV 对并将它们(通过 Key)组合成 KC 对,其中 C 是与 V 不同的类型。

一个指定了 3 个函数,createCombiner、mergeValue、mergeCombiners。第一个指定如何将类型 V 转换为类型 C,第二个描述如何将类型 C 与类型 V 组合,最后指定如何将类型 C 与另一个类型 C 组合。我的代码创建了 KV 对:

定义 3 个函数如下:

def Combiner(a):    #Turns value a (a tuple) into a list of a single tuple.
    return [a]

def MergeValue(a, b): #a is the new type [(,), (,), ..., (,)] and b is the old type (,)
    a.extend([b])
    return a

def MergeCombiners(a, b): #a is the new type [(,),...,(,)] and so is b, combine them
    a.extend(b)
    return a

然后,My_KMV = My_KV.combineByKey(Combiner, MergeValue, MergeCombiners)

我发现使用此功能的最佳资源是:http ://abshinn.github.io/python/apache-spark/2014/10/11/using-combinebykey-in-apache-spark/

正如其他人指出的那样,a.append(b)a.extend(b)返回None。所以在reduceByKey(lambda a, b: a.append(b))第一对 KV 对上返回 None ,然后在第二对上失败,因为 None.append(b) 失败。您可以通过定义一个单独的函数来解决此问题:

 def My_Extend(a,b):
      a.extend(b)
      return a

然后调用reduceByKey(lambda a, b: My_Extend(a,b))(这里可能不需要使用lambda函数,但我没有测试过这种情况。)

于 2014-11-20T15:39:59.060 回答
2

错误消息源于闭包中“a”的类型。

 My_KMV = My_KV.reduce(lambda a, b: a.append([b]))

让 pySpark 将 a 显式评估为列表。例如,

My_KMV = My_KV.reduceByKey(lambda a,b:[a].extend([b]))

在很多情况下,reduceByKey 会比 groupByKey 更可取,参考: http ://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html

于 2015-06-23T17:33:32.163 回答
2

我试过 combineByKey ,这是我的步骤

combineddatardd=sc.parallelize([("A", 3), ("A", 9), ("A", 12),("B", 4), ("B", 10), ("B", 11)])

combineddatardd.combineByKey(lambda v:[v],lambda x,y:x+[y],lambda x,y:x+y).collect()

输出:

[('A', [3, 9, 12]), ('B', [4, 10, 11])]
  1. 为组合器定义一个函数,将累加器设置为它在分区内遇到的第一个键值对,在此步骤中将值转换为列表

  2. 定义一个函数,将同一键的新值合并到步骤 1 中捕获的累加器值注意:-在此函数中将值转换为列表,因为累加器值在第一步中被转换为列表

  3. 定义功能以合并各个分区的组合器输出。

于 2016-02-15T05:59:12.390 回答
1

行。我希望,我做对了。你的输入是这样的:

kv_input = [("a", 1), ("a", 2), ("a", 3), ("b", 1), ("b", 5)]

你想得到这样的东西:

kmv_output = [("a", [1, 2, 3]), ("b", [1, 5])]

那么这可能会完成这项工作(见这里):

d = dict()
for k, v in kv_input:
    d.setdefault(k, list()).append(v)
kmv_output = list(d.items())

如果我弄错了,请告诉我,以便我可以根据您的需要进行调整。

PS:a.append([b])总是返回None。您可能想观察[b]或观察a的结果,但不是append

于 2014-11-18T21:44:28.170 回答
0

我在寻找同样问题的java示例时点击了这个页面。(如果你的情况类似,这是我的例子)

诀窍是-您需要为键分组。

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

public class SparkMRExample {

    public static void main(String[] args) {
        // spark context initialisation
        SparkConf conf = new SparkConf()
                .setAppName("WordCount")
                .setMaster("local");
        JavaSparkContext context = new JavaSparkContext(conf);

        //input for testing;
        List<String> input = Arrays.asList("Lorem Ipsum is simply dummy text of the printing and typesetting industry.",
                "Lorem Ipsum has been the industry's standard dummy text ever since the 1500s, when an unknown printer took a galley of type and scrambled it to make a type specimen book.",
                "It has survived not only for centuries, but also the leap into electronic typesetting, remaining essentially unchanged.",
                "It was popularised in the 1960s with the release of Letraset sheets containing Lorem Ipsum passages, and more recently with desktop publishing");
        JavaRDD<String> inputRDD = context.parallelize(input);


        // the map phase of word count example
        JavaPairRDD<String, Integer> mappedRDD =
                inputRDD.flatMapToPair( line ->                      // for this input, each string is a line
                        Arrays.stream(line.split("\\s+"))            // splitting into words, converting into stream
                                .map(word -> new Tuple2<>(word, 1))  // each word is assigned with count 1
                                .collect(Collectors.toList()));      // stream to iterable

        // group the tuples by key
        // (String,Integer) -> (String, Iterable<Integer>)
        JavaPairRDD<String, Iterable<Integer>> groupedRDD = mappedRDD.groupByKey();

        // the reduce phase of word count example
        //(String, Iterable<Integer>) -> (String,Integer)
        JavaRDD<Tuple2<String, Integer>> resultRDD =
                groupedRDD.map(group ->                                      //input is a tuple (String, Iterable<Integer>)
                        new Tuple2<>(group._1,                              // the output key is same as input key
                        StreamSupport.stream(group._2.spliterator(), true)  // converting to stream
                                .reduce(0, (f, s) -> f + s)));              // the sum of counts
        //collecting the RRD so that we can print
        List<Tuple2<String, Integer>> result = resultRDD.collect();
        // print each tuple
        result.forEach(System.out::println);
    }
}
于 2015-04-06T08:31:34.287 回答
0

我希望你有这样的输入数据

10 1
10 2
20 4
20 7
20 9

你想要这样的输出

10-1,2
20-4,7,9

你可以做这样的事情

rdd=sc.textFile("location_of_file") 

def parse(line):
    fields=line.split(" ")
    return (fields[0],fields[1])

rdd1=rdd.map(parse) //parse func is for having the input as key,value pair
rdd1.groupByKey().mapValues(list).collect()
于 2022-01-01T19:48:39.497 回答