我有一个读取输入并写入数据库的映射器。我想限制实际转换并写入该数据库的输入数量,并且所有映射器都必须对限制做出贡献,然后在达到该限制后停止(大约;一两个额外的没什么大不了的。)
我在我们的映射器上实现了一个限制器功能,它询问其他任务“您导入了多少条记录?” 一旦达到给定的限制,它将停止导入这些记录(尽管它将继续处理它们以用于其他目的。)
有问题的地图代码如下所示:
public void map(ImmutableBytesWritable key, Result row, Context context) {
// prepare the input
// ...
if (context.getCounter(Metrics.IMPORTED).getValue()<IMPORT_LIMIT){
importRecord();
context.getCounter(Metrics.IMPORTED).increment(1l);
}
// do other things
// ...
}
因此,每个映射器都会检查是否还有更多空间可以导入,并且只有在未达到限制时才会执行任何导入。但是,每个映射器本身都导入了限制,因此对于 16 个映射器,我们导入了 16*IMPORT_LIMIT 记录。它肯定在做一些限制(计数远低于导入记录的正常数量。)
何时将计数器值推送到其他映射器,或者它们甚至可供每个映射器使用?我实际上可以从计数器获得一些实时值,还是它们仅在映射器完成时更新?有没有更好的方法在映射器之间共享值?