我正在将一些 map-reduce 代码迁移到 Spark 中,并且在构造 Iterable 以在函数中返回时遇到问题。在MR代码中,我有一个按键分组的reduce函数,然后(使用multipleOutputs)将迭代这些值并使用write(在多个输出中,但这并不重要)像这样的一些代码(简化):
reduce(Key key, Iterable<Text> values) {
// ... some code
for (Text xml: values) {
multipleOutputs.write(key, val, directory);
}
}
但是,在 Spark 中,我已将地图翻译为以下序列:mapToPair -> groupByKey -> flatMap 推荐...在某些书中。
mapToPair 基本上是通过 functionMap 添加一个 Key,它基于记录上的一些值为该记录创建一个 Key。有时键可能具有非常高的基数。
JavaPairRDD<Key, String> rddPaired = inputRDD.mapToPair(new PairFunction<String, Key, String>() {
public Tuple2<Key, String> call(String value) {
//...
return functionMap.call(value);
}
});
rddPaired被应用一个RDD.groupByKey () 来获取 RDD 来提供 flatMap 函数:
JavaPairRDD<Key, Iterable<String>> rddGrouped = rddPaired.groupByKey();
分组后,调用 flatMap 来执行reduce。在这里,操作是一种转换:
public Iterable<String> call (Tuple2<Key, Iterable<String>> keyValue) {
// some code...
List<String> out = new ArrayList<String>();
if (someConditionOnKey) {
// do a logic
Grouper grouper = new Grouper();
for (String xml : keyValue._2()) {
// group in a separate class
grouper.add(xml);
}
// operation is now performed on the whole group
out.add(operation(grouper));
} else {
for (String xml : keyValue._2()) {
out.add(operation(xml));
}
return out;
}
}
它工作正常......使用没有太多记录的键。实际上,当具有大量值的键在 reduce 上输入“else”时,它会被 OutOfMemory 打破。
注意:我已经包含了“if”部分来解释我想要产生的逻辑,但是在输入“else”时会发生失败......因为当数据进入“else”时,通常意味着会有更多的值由于数据的性质。
很明显,必须将所有分组值保留在“out”列表中,如果一个键有数百万条记录,它将无法扩展,因为它将它们保留在内存中。我已经到了发生 OOM 的地步(是的,它是在执行上面要求内存的“操作”时 - 并且没有给出。虽然这不是一个非常昂贵的内存操作)。
有没有办法避免这种情况以扩大规模?要么通过使用其他指令复制行为以更可扩展的方式达到相同的输出,要么能够将值交给 Spark 进行合并(就像我过去对 MR 所做的那样)......