我想为 Hadoop 创建一个并行 scanLeft(计算关联运算符的前缀和)函数(特别是烫伤;请参阅下文了解如何完成)。
给定 hdfs 文件中的一系列数字(每行一个),我想用连续偶数/奇数对的总和计算一个新序列。例如:
输入序列:
0,1,2,3,4,5,6,7,8,9,10
输出顺序:
0+1、2+3、4+5、6+7、8+9、10
IE
1,5,9,13,17,10
我认为为了做到这一点,我需要为 Hadoop 编写一个 InputFormat 和 InputSplits 类,但我不知道该怎么做。
请参见此处的第 3.3 节。以下是 Scala 中的示例算法:
// for simplicity assume input length is a power of 2
def scanadd(input : IndexedSeq[Int]) : IndexedSeq[Int] =
if (input.length == 1)
input
else {
//calculate a new collapsed sequence which is the sum of sequential even/odd pairs
val collapsed = IndexedSeq.tabulate(input.length/2)(i => input(2 * i) + input(2*i+1))
//recursively scan collapsed values
val scancollapse = scanadd(collapse)
//now we can use the scan of the collapsed seq to calculate the full sequence
val output = IndexedSeq.tabulate(input.length)(
i => i.evenOdd match {
//if an index is even then we can just look into the collapsed sequence and get the value
// otherwise we can look just before it and add the value at the current index
case Even => scancollapse(i/2)
case Odd => scancollapse((i-1)/2) + input(i)
}
output
}
我知道这可能需要进行一些优化才能与 Hadoop 很好地配合使用。我认为直接翻译这个会导致非常低效的 Hadoop 代码。例如,显然在 Hadoop 中你不能使用 IndexedSeq。我会很感激你看到的任何具体问题。不过,我认为它可能会很好地工作。