5

我真的被困在一个点上,我必须强制mapReduce框架只使用reducer一个特定键。我还想影响框架如何对键进行排序。我将通过一个例子来介绍这个问题:

我想以下列形式发出键值对:

< bxb >:<d1>
< bx >:<d2>
< b >:<d3>
< bax >:<d2,d3>
图1

关键是一个序列 - 如您所见 - 每个都以一个项目b开头,这将是一个数据类型string。值将是ObjectIDs,如字母d和数字所示。我从函数中发出了其他键值对map,它们以键中的不同项目开头,例如ax

< abx >:<d1>
< ax >:<d3>
< xaa >:<d3>
图2

reduce我需要强制框架为每个键值对调用一个函数,该函数以特定项目开头。此外,我必须强制在mapreduce以相反的字典顺序对键进行排序。因此,单个 reducer 将收到以下项目b的键值对:

< bxb >:<d1>
< bx >:<d2>
< bax >:<d2,d3>
< b >:<d3>
图3

我试过的:

我尝试以以下形式发出键值对:

b : < (d1 : < bxb >) >
b : < (d2 : < bx >) >
b : < (d3 : < b >) >
b : < (d2 : < bax >), (d3 : < bax > ) >
图 4

通过这种方式,单个 reducer 接收项目b的值,但正如您所看到的,它不是以相反的字典顺序,最糟糕的是,不能保证单个 reducer 会获取特定键的所有值(如 MongoDB 的 MapReduce文档状态)。

基本上:我必须以相反的字典顺序处理以特定项目开头的这些序列。

我没有任何想法可以引导我进一步解决问题。如何对键执行单个减速器并影响排序?我应该如何设计传递(发出)的数据结构以满足我的需求?

这些功能类似于 HadoopComparatorPartitioner.

更新 - - - - - - - - - - - - - - - - - - - - - - - - - -------------------------------------------------- ---------------------

Asya Kamsky 向我指出,finalize每个键只运行一次,因此它解决了分区问题,即每个值必须由单个 reducer 看到特定键。

排序仍然是一个问题。对于大型数据集,在内部实现我自己的排序finalize将意味着执行时间方面的巨大瓶颈,而我没有利用 和 之间的自然排序map机制reduce。键是 data type string,但很容易将它们编码为负数integers以强制反向排序。

让我们再次检查图 3

< bxb >:<d1>
< bx >:<d2>
< bax >:<d2,d3>
< b >:<d3>
图3

这就是 afinalize必须收到的密钥b。例如,键< b x b >在这里是复合的。Finalize 需要接收以b开头的键,但对于键的其他部分,按字典顺序相反。

有什么办法可以实现这一点并避免内部排序finalize

4

1 回答 1

3

您可以做的是“正常”发出文档并使用 reduce 将所有发出的值组合成一个排序数组。然后使用finalize方法在单个减速器中执行您要执行的任何处理。

MongoDB reduce 函数可以被多次调用,但也可以从不调用(如果只为特定键发出单个值)。Usingfinalize可以解决这两个问题,因为每个键只调用一次。

样本数据:

> db.sorts.find()
{ "_id" : 1, "b" : 1, "a" : 20 }
{ "_id" : 2, "b" : 1, "a" : 2 }
{ "_id" : 3, "b" : 2, "a" : 12 }
{ "_id" : 4, "b" : 3, "a" : 1 }
{ "_id" : 5, "b" : 2, "a" : 1 }
{ "_id" : 6, "b" : 3, "a" : 11 }
{ "_id" : 7, "b" : 3, "a" : 5 }
{ "_id" : 8, "b" : 2, "a" : 1 }
{ "_id" : 9, "b" : 1, "a" : 15 }

地图功能:

map = function() {
   emit( this.b, { val: [ this.a ] } );
}

通过遍历数组将新传入的 val 添加到排序数组中的 reduce 函数:

reduce = function( key, values) {
   var result = { val: [ ] };
   values.forEach(function(v) {
      var newval = v.val[0];
      var added = false;
      for (var i=0; i < result.val.length; i++) {
           if (newval < result.val[i]) {
                 result.val.splice(i, 0, newval);
                 added=true;
                 break;
           }
      }
      if ( !added ) {
         result.val.splice(result.val.length, 0, newval);
      }
   });
   return result;
}

Finalize 只返回一个简单的数组:

finalize = function( key, values ) {
   // values is document with a sorted array
   // do your "single reduce" functionality here
   return values.val;
}

运行 MapReduce:

> db.sorts.mapReduce(map, reduce, {out:"outs", finalize:finalize})
{
    "result" : "outs",
    "timeMillis" : 10,
    "counts" : {
        "input" : 9,
        "emit" : 9,
        "reduce" : 3,
        "output" : 3
    },
    "ok" : 1,
}

结果是:

> db.outs.find()
{ "_id" : 1, "value" : [  2,  15,  20 ] }
{ "_id" : 2, "value" : [  1,  1,  12 ] }
{ "_id" : 3, "value" : [  1,  5,  11 ] }
于 2013-10-18T08:27:12.830 回答