0

我有一个读取输入并写入数据库的映射器。我想限制实际转换并写入该数据库的输入数量,并且所有映射器都必须对限制做出贡献,然后在达到该限制后停止(大约;一两个额外的没什么大不了的。)

我在我们的映射器上实现了一个限制器功能,它询问其他任务“您导入了多少条记录?” 一旦达到给定的限制,它将停止导入这些记录(尽管它将继续处理它们以用于其他目的。)

有问题的地图代码如下所示:

public void map(ImmutableBytesWritable key, Result row, Context context) {
  // prepare the input
  // ...

  if (context.getCounter(Metrics.IMPORTED).getValue()<IMPORT_LIMIT){
    importRecord();
    context.getCounter(Metrics.IMPORTED).increment(1l);
  }

  // do other things
  // ...
}

因此,每个映射器都会检查是否还有更多空间可以导入,并且只有在未达到限制时才会执行任何导入。但是,每个映射器本身都导入了限制,因此对于 16 个映射器,我们导入了 16*IMPORT_LIMIT 记录。它肯定在做一些限制(计数远低于导入记录的正常数量。)

何时将计数器值推送到其他映射器,或者它们甚至可供每个映射器使用?我实际上可以从计数器获得一些实时值,还是它们仅在映射器完成时更新?有没有更好的方法在映射器之间共享值?

4

1 回答 1

0

好的:据我所见,MapReduce 在工作完成之前不会在映射器之间共享计数器(即根本没有。)我不确定中途提交的映射器是否会允许以后的映射器看到他们的计数器,但它不够可靠,无法实时完成。

相反,我将运行一个简单的 java 应用程序,它自己迭代行并写入列,现有的 MapReduce 作业将使用该列来确定它是否应该导入行。

于 2012-06-25T15:08:08.803 回答