0

我正在将一些 map-reduce 代码迁移到 Spark 中,并且在构造 Iterable 以在函数中返回时遇到问题。在MR代码中,我有一个按键分组的reduce函数,然后(使用multipleOutputs)将迭代这些值并使用write(在多个输出中,但这并不重要)像这样的一些代码(简化):

reduce(Key key, Iterable<Text> values) {
    // ... some code
    for (Text xml: values) {
        multipleOutputs.write(key, val, directory);
    }
}

但是,在 Spark 中,我已将地图翻译为以下序列:mapToPair -> groupByKey -> flatMap 推荐...在某些书中。

mapToPair 基本上是通过 functionMap 添加一个 Key,它基于记录上的一些值为该记录创建一个 Key。有时键可能具有非常高的基数。

JavaPairRDD<Key, String> rddPaired = inputRDD.mapToPair(new PairFunction<String, Key, String>() { 
    public Tuple2<Key, String> call(String value) {
        //... 
        return functionMap.call(value);
    }
});

rddPaired被应用一个RDD.groupByKey () 来获取 RDD 来提供 flatMap 函数:

JavaPairRDD<Key, Iterable<String>> rddGrouped = rddPaired.groupByKey();

分组后,调用 flatMap 来执行reduce。在这里,操作是一种转换:

public Iterable<String> call (Tuple2<Key, Iterable<String>> keyValue) {
    // some code...
    List<String> out = new ArrayList<String>();
    if (someConditionOnKey) { 
        // do a logic
        Grouper grouper = new Grouper();
        for (String xml : keyValue._2()) {
            // group in a separate class
            grouper.add(xml);
        }
        // operation is now performed on the whole group
        out.add(operation(grouper));
    } else {
        for (String xml : keyValue._2()) {
            out.add(operation(xml));
        }
        return out;
    }
}

它工作正常......使用没有太多记录的键。实际上,当具有大量值的键在 reduce 上输入“else”时,它会被 OutOfMemory 打破。

注意:我已经包含了“if”部分来解释我想要产生的逻辑,但是在输入“else”时会发生失败......因为当数据进入“else”时,通常意味着会有更多的值由于数据的性质。

很明显,必须将所有分组值保留在“out”列表中,如果一个键有数百万条记录,它将无法扩展,因为它将它们保留在内存中。我已经到了发生 OOM 的地步(是的,它是在执行上面要求内存的“操作”时 - 并且没有给出。虽然这不是一个非常昂贵的内存操作)。

有没有办法避免这种情况以扩大规模?要么通过使用其他指令复制行为以更可扩展的方式达到相同的输出,要么能够将值交给 Spark 进行合并(就像我过去对 MR 所做的那样)......

4

1 回答 1

2

在操作内部做条件是低效的flatMap。您应该检查外部条件以创建 2 个不同的 RDD 并分别处理它们。

rddPaired.cache();

// groupFilterFunc will filter which items need grouping
JavaPairRDD<Key, Iterable<String>> rddGrouped = rddPaired.filter(groupFilterFunc).groupByKey();
// processGroupedValuesFunction should call `operation` on group of all values with the same key and return the result
rddGrouped.mapValues(processGroupedValuesFunction);

// nogroupFilterFunc will filter which items don't need grouping
JavaPairRDD<Key, Iterable<String>> rddNoGrouped = rddPaired.filter(nogroupFilterFunc);
// processNoGroupedValuesFunction2 should call `operation` on a single value and return the result
rddNoGrouped.mapValues(processNoGroupedValuesFunction2);
于 2016-07-12T10:46:30.277 回答