0

我有一个由 4 个 riak 节点组成的集群,其中加载了数据。我正在尝试执行一个简单的 MapReduce 作业,它只是聚合,但我试图通过提供我自己的 javascript 函数来完成它(以便移动到更多涉及的 MapReduce 作业)。

我相关的 Java 代码片段是:

IndexQuery iq = new IntRangeQuery(IntIndex.named(indexId), bucketId, 11, 40);
Function mapfunc = new JSSourceFunction(
    streamToString(MapReduceDriver.class.getResourceAsStream("/map_1.js")));
Function redfunc = new JSSourceFunction(
    streamToString(MapReduceDriver.class.getResourceAsStream("/reduce_1.js")));
PBMapReduceResult result = (PBMapReduceResult) riakClient.mapReduce(iq)
            .addMapPhase(mapfunc)
            .addReducePhase(redfunc)
            .execute();

两个javascript函数在哪里:

function map_keepAttr(value, keyData, arg) {
    var data = Riak.mapValuesJson(value)[0];
    return [ data.Attribute_17 ];
}

function reduce_aggregate(values, arg) {
    return [values.length];
}

我看到的问题如下:我的查询和映射阶段正好产生了 30 个值。但减少阶段报告 3 而不是 30(因此计数不正确)。更奇怪的是,当我使用下面的reduce函数时:

function reduce_aggregate(values, arg) {
    return values.length;
}

我得到了预期的结果,即一个包含 30 个条目的 json 数组。

任何帮助都会救我,因为我似乎不明白 Riak 中的 MapReduce 是如何工作的。

谢谢!

4

1 回答 1

1

我怀疑您看到的问题可能是由于没有考虑减少阶段功能中的重新减少。

虽然 map 阶段函数每条记录执行一次,但 reduce 阶段函数不一定会以完整数据集作为输入执行一次,而是在 map 阶段输出的部分上递归执行,直到处理完所有记录。第一次运行 reduce 函数创建的结果将包含在发送到下一次调用的数组中。

为了使用 reduce 函数计算项目的数量,您需要能够将结果与以前的 reduce 函数与 map 阶段输入区分开来,或者确保它们具有相同的格式并且无论数据来自何处都可以正确聚合从。

于 2012-11-08T14:50:33.113 回答