1

假设我有一个包含此类信息的大 tsv 文件:

2012-09-22 00:00:01.0   249342258346881024  47268866    0   0   0   bo
2012-09-22 00:00:02.0   249342260934746115  1344951     0   0   4   ot
2012-09-22 00:00:02.0   249342261098336257  346095334   1   0   0   ot
2012-09-22 00:05:02.0   249342261500977152  254785340   0   1   0   ot

我想实现一个 MapReduce 作业,它枚举五分钟的时间间隔并过滤 tsv 输入的一些信息。输出文件如下所示:

0 47268866  bo
0 134495    ot
0 346095334 ot
1 254785340 ot

关键是区间的编号,例如,0 是 到 之间的区间的2012-09-22 00:00:00.0参考2012-09-22 00:04:59

我不知道这个问题是否不适合 MapReduce 方法,或者我是否认为它不正确。在 map 函数中,我只是将时间戳作为键传递,将过滤后的信息作为值传递。在 reduce 函数中,我使用全局变量计算间隔并产生提到的输出。

一世。框架是否以某种自动方式确定减速器的数量还是由用户定义?使用一个 reducer,我认为我的方法没有问题,但我想知道在处理非常大的文件时,一个 reducer 是否会成为瓶颈,可以吗?

ii. 如何使用多个减速器解决这个问题?

任何建议将不胜感激!提前致谢!

编辑:

第一个问题由@Olaf 回答,但第二个问题仍然让我对并行性有一些疑问。我的地图函数的地图输出目前是这样的(我只是以分钟精度传递时间戳):

2012-09-22 00:00   47268866    bo
2012-09-22 00:00   344951      ot
2012-09-22 00:00   346095334   ot
2012-09-22 00:05   254785340   ot

所以在reduce函数中,我收到输入,键表示收集信息的分钟,值表示信息本身,我想枚举从0开始的五分钟间隔。我目前正在使用全局变量来存储开始间隔,当一个键推断它时,我正在增加间隔计数器(这也是一个全局变量)。

这是代码:

private long stepRange = TimeUnit.MINUTES.toMillis(5);
private long stepInitialMillis = 0;
private int stepCounter = 0;

@Override
public void reduce(Text key, Iterable<Text> values, Context context)
        throws IOException, InterruptedException {

    long millis = Long.valueOf(key.toString());
    if (stepInitialMillis == 0) {
        stepInitialMillis = millis;
    } else {
        if (millis - stepInitialMillis > stepRange) {
            stepCounter = stepCounter + 1;
            stepInitialMillis = millis;
        }
    }
    for (Text value : values) {
        context.write(new Text(String.valueOf(stepCounter)),
                new Text(key.toString() + "\t" + value));
    }
}

因此,使用多个 reducer,我将在两个或更多节点上运行我的 reduce 函数,在两个或更多 JVM 中,我将失去全局变量赋予的控制权,而且我没有考虑针对我的情况的解决方法。

4

2 回答 2

2

reducer 的数量取决于集群的配置,尽管您可以限制 MapReduce 作业使用的 reducer 的数量。

如果您要处理大量数据,单个 reducer 确实会成为 MapReduce 作业的瓶颈。

Hadoop MapReduce 引擎保证与同一个键关联的所有值都发送到同一个 reducer,因此您的方法应该适用于多个 reducer。见雅虎!详细教程:http: //developer.yahoo.com/hadoop/tutorial/module4.html#listreducing

编辑:为了保证相同时间间隔的所有值都进入同一个减速器,您必须使用时间间隔的一些唯一标识符作为键。您必须在映射器中执行此操作。我正在再次阅读您的问题,除非您想以某种方式聚合对应于相同时间间隔的记录之间的数据,否则您根本不需要任何减速器。

编辑:正如@SeanOwen 指出的那样,reducer 的数量取决于集群的配置。通常配置为每个节点的最大任务数乘以数据节点数的 0.95 到 1.75 倍。如果集群配置中未设置 mapred.reduce.tasks 值,则默认 reducer 数量为 1。

于 2013-10-16T19:26:47.580 回答
1

看起来您希望按五分钟的时间块聚合一些数据。使用 Hadoop 的 Map-reduce 非常适合这类事情!应该没有理由使用任何“全局变量”。这是我将如何设置它:

映射器读取 TSV 的一行。它获取时间戳,并计算它属于哪个五分钟存储桶。将其制成一个字符串,并将其作为键发出,例如“20120922:0000”、“20120922:0005”、“20120922:0010”等. 至于与该键一起发出的值,只需保持简单,然后将整个制表符分隔的行作为另一个 Text 对象发送。

既然 mapper 已经确定了数据需要如何组织,那么 reducer 的工作就是进行聚合。每个 reducer 都会获得一个密钥(一个五分钟的存储桶),以及适合该存储桶的所有行的列表。它可以遍历该列表,并从中提取所需的任何内容,并根据需要将输出写入上下文。

至于映射器,让 hadoop 弄清楚那部分。将 reducer 的数量设置为集群中的节点数,作为起点。应该运行得很好。

希望这可以帮助。

于 2013-10-24T21:44:52.363 回答