2

我正在编写一个 M/R 作业,它处理以二进制格式编写的大型时间序列数据文件,看起来像这样(为了便于阅读,这里的新行,实际数据是连续的,显然):

TIMESTAMP_1---------------------TIMESTAMP_1
TIMESTAMP_2**********TIMESTAMP_2 
TIMESTAMP_3%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%TIMESTAMP_3
.. etc

其中 timestamp 只是一个 8 字节的结构,可以通过前 2 个字节来识别。如上所示,实际数据在重复值时间戳之间有界,并且包含一个或多个预定义结构。我想编写一个自定义 InputFormat,它将向映射器发出键/值对:

< TIMESTAMP_1, --------------------- >
< TIMESTAMP_2, ********** >
< TIMESTAMP_3, %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% >

从逻辑上讲,我想跟踪当前的TIMESTAMP,并汇总所有数据,直到TIMESTAMP再次检测到该数据,然后将我的<TIMESTAMP, DATA>对作为记录发送出去。我的问题是在内部拆分之间同步RecordReader,所以如果某个读者收到以下拆分

# a split occurs inside my data
reader X: TIMESTAMP_1--------------
reader Y: -------TIMESTAMP_1 TIMESTAMP_2****..

# or inside the timestamp
or even: @@@@@@@TIMES
         TAMP_1-------------- ..

解决这个问题的好方法是什么?我是否有一种简单的方法来访问文件偏移量,以便我CustomRecordReader可以在拆分之间同步而不会丢失数据?我觉得我在如何处理拆分方面存在一些概念上的空白,因此对这些问题的解释可能会有所帮助。谢谢。

4

2 回答 2

3

一般来说,创建支持拆分的输入格式并不简单,因为您应该能够找出从拆分边界移动到哪里以获得一致的记录。XmlInputFormat 是这样做的格式的一个很好的例子。
我建议首先考虑您是否确实需要可拆分的输入?您可以将输入格式定义为不可拆分,并且不会出现所有这些问题。
如果您的文件通常不会比块大小大很多 - 您不会丢失任何内容。如果他们这样做 - 您将失去部分数据局部性。

于 2012-05-10T12:39:19.180 回答
2

您可以子类化 的具体子类FileInputFormat,例如 ,SeqenceFileAsBinaryInputFormat并覆盖isSplitable()方法以返回false

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat;

public class NonSplitableBinaryFile extends SequenceFileAsBinaryInputFormat{

  @Override
  protected boolean isSplitable(FileSystem fs, Path file) {
      return false;
  }

  @Override
  public RecordReader getRecordReader(InputSplit split, JobConf job,
  Reporter reporter) throws IOException {
    //return your customized record reader here
  }
}
于 2012-05-10T11:43:40.847 回答