假设我有一个包含此类信息的大 tsv 文件:
2012-09-22 00:00:01.0 249342258346881024 47268866 0 0 0 bo
2012-09-22 00:00:02.0 249342260934746115 1344951 0 0 4 ot
2012-09-22 00:00:02.0 249342261098336257 346095334 1 0 0 ot
2012-09-22 00:05:02.0 249342261500977152 254785340 0 1 0 ot
我想实现一个 MapReduce 作业,它枚举五分钟的时间间隔并过滤 tsv 输入的一些信息。输出文件如下所示:
0 47268866 bo
0 134495 ot
0 346095334 ot
1 254785340 ot
关键是区间的编号,例如,0 是 到 之间的区间的2012-09-22 00:00:00.0
参考2012-09-22 00:04:59
。
我不知道这个问题是否不适合 MapReduce 方法,或者我是否认为它不正确。在 map 函数中,我只是将时间戳作为键传递,将过滤后的信息作为值传递。在 reduce 函数中,我使用全局变量计算间隔并产生提到的输出。
一世。框架是否以某种自动方式确定减速器的数量还是由用户定义?使用一个 reducer,我认为我的方法没有问题,但我想知道在处理非常大的文件时,一个 reducer 是否会成为瓶颈,可以吗?
ii. 如何使用多个减速器解决这个问题?
任何建议将不胜感激!提前致谢!
编辑:
第一个问题由@Olaf 回答,但第二个问题仍然让我对并行性有一些疑问。我的地图函数的地图输出目前是这样的(我只是以分钟精度传递时间戳):
2012-09-22 00:00 47268866 bo
2012-09-22 00:00 344951 ot
2012-09-22 00:00 346095334 ot
2012-09-22 00:05 254785340 ot
所以在reduce函数中,我收到输入,键表示收集信息的分钟,值表示信息本身,我想枚举从0开始的五分钟间隔。我目前正在使用全局变量来存储开始间隔,当一个键推断它时,我正在增加间隔计数器(这也是一个全局变量)。
这是代码:
private long stepRange = TimeUnit.MINUTES.toMillis(5);
private long stepInitialMillis = 0;
private int stepCounter = 0;
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
long millis = Long.valueOf(key.toString());
if (stepInitialMillis == 0) {
stepInitialMillis = millis;
} else {
if (millis - stepInitialMillis > stepRange) {
stepCounter = stepCounter + 1;
stepInitialMillis = millis;
}
}
for (Text value : values) {
context.write(new Text(String.valueOf(stepCounter)),
new Text(key.toString() + "\t" + value));
}
}
因此,使用多个 reducer,我将在两个或更多节点上运行我的 reduce 函数,在两个或更多 JVM 中,我将失去全局变量赋予的控制权,而且我没有考虑针对我的情况的解决方法。