1

我是一个hadoop初学者。我遇到了这个自定义的 RecordReader 程序,它一次读取 3 行并输出将 3 行输入提供给映射器的次数。

我能够理解为什么使用 RecordReader,但是当输入格式类本质上扩展 mapreduce.TextInputFormat 类时,我无法看到每个 InputSplit 如何包含 3 行。根据我的理解,TextInputFormat 类为每一行(对于每个 \n)发出 1 个 InputSplit。

那么 RecordReader 是如何从每个 InputSplit 中读取 3 行的呢?请有人解释这是怎么可能的。提前致谢!

4

1 回答 1

4

您需要了解实现TextInputFormat以发现答案。

让我们深入研究代码。我将谈论新的 mapreduce API,但“旧”的 mapred API 非常相似。

正如您所说,从用户的角度来看,aTextInputFormat根据一些换行符将拆分拆分为记录。让我们检查一下实现

你可以看到这个类几乎是空的。关键功能是createRecordInputFormat

@Override
public RecordReader<LongWritable, Text> createRecordReader(
        InputSplit split, 
        TaskAttemptContext context
) {
   return new LineRecordReader();
}

一般约定是使用 InputFormat 来获取 RecordReader。如果你往里看MapperMapContextImpl你会发现映射器只使用 RecordReader 来获取下一个键和值。他什么都不知道。

映射器:

public void run(Context context) throws IOException, InterruptedException {
  setup(context);
  while (context.nextKeyValue()) {
    map(context.getCurrentKey(), context.getCurrentValue(), context);
  }
  cleanup(context);

}

MapContextImpl:

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
  return reader.nextKeyValue();
}

现在重新仔细阅读您提供的此链接。你会看到:

  • NLinesInputFormat扩展TextInputFormat并且仅覆盖 createRecordReader. 基本上,而不是使用LineReader您提供自己的RecordReader. 您想要扩展TextInputFormat而不是层次结构中更高的另一个类,因为它已经处理了在这个级别完成的所有事情并且您可能需要(压缩、不可拆分格式等)
  • NLinesRecordReader做真正的工作。initialize它完成了从InputStream提供的InputSplit. 它还创建一个LineReader,与使用的相同TextInputFormat
  • 在该nextKeyValue方法中,您将看到它LineReader.readLine()被调用三次以获得三行(加上一些逻辑来正确处理极端情况,例如过大的记录、行尾、拆分结束)

希望对您有所帮助。关键是要了解 API 的整体设计以及每个部分如何相互交互。

于 2014-08-16T13:27:11.730 回答