2

这是背景。我的 MapReduce 作业有以下输入(示例):

Apache Hadoop
Apache Lucene
StackOverflow
....

(实际上每一行代表一个用户查询。在这里并不重要。)我希望我的RecordReader班级读一行,然后将几个键值对传递给映射器。例如,如果RecordReadergets Apache Hadoop,那么我希望它生成以下键值对并将其传递给映射器:

Apache Hadoop - 1
Apache Hadoop - 2
Apache Hadoop - 3

(“-”是这里的分隔符。)我发现RecordReader在方法中传递键值next()

next(key, value);

每次调用 RecordReader.next() 时,只会将一个键和一个值作为参数传递。那么我应该如何完成我的工作呢?

4

3 回答 3

2

我相信你可以简单地使用这个:

public static class MultiMapper 
        extends Mapper<LongWritable, Text, Text, IntWritable> {

    @Override
    public void map(LongWritable key, Text value, Context context) 
            throws IOException, InterruptedException {

        for (int i = 1; i <= n; i++) {
            context.write(value, new IntWritable(i));
        }
    }
}

这里 n 是您要传递的值的数量。例如,对于您指定的键值对:

Apache Hadoop - 1
Apache Hadoop - 2
Apache Hadoop - 3

n 为 3。

于 2013-05-29T06:55:45.137 回答
1

我认为如果您想发送到映射器,请使用相同的密钥;你必须实现你的所有者 RecordReader;例如,您可以编写一个 MutliRecordReader 来扩展 LineRecordReade;在这里您必须更改 nextKeyValue 方法;这是 LineRecordReade 的原始代码:</p>

public boolean nextKeyValue() throws IOException {
    if (key == null) {
      key = new LongWritable();
    }
    key.set(pos);
    if (value == null) {
      value = new Text();
    }
    int newSize = 0;
    // We always read one extra line, which lies outside the upper
    // split limit i.e. (end - 1)
    while (getFilePosition() <= end) {
      newSize = in.readLine(value, maxLineLength,
          Math.max(maxBytesToConsume(pos), maxLineLength));
      pos += newSize;
      if (newSize < maxLineLength) {
        break;
      }

      // line too long. try again
      LOG.info("Skipped line of size " + newSize + " at pos " + 
               (pos - newSize));
    }
    if (newSize == 0) {
      key = null;
      value = null;
      return false;
    } else {
      return true;
    }
  }

你可以像这样改变它:

public boolean nextKeyValue() throws IOException {
    if (key == null) {
      key = new Text();
    }
    key.set(pos);
    if (value == null) {
      value = new Text();
    }
    int newSize = 0;

    while (getFilePosition() <= end && n<=3) {
      newSize = in.readLine(key, maxLineLength,
          Math.max(maxBytesToConsume(pos), maxLineLength));//change value --> key

     value =Text(n);
     n++;
     if(n ==3 )// we don't go to next until the N is three;
         pos += newSize;

      if (newSize < maxLineLength) {
        break;
      }

      // line too long. try again
      LOG.info("Skipped line of size " + newSize + " at pos " + 
               (pos - newSize));
    }
    if (newSize == 0) {
      key = null;
      value = null;
      return false;
    } else {
      return true;
    }
  }

我认为这适合你

于 2013-05-29T08:40:55.550 回答
0

尽量不给钥匙:-

context.write(NullWritable.get(), new Text("Apache Hadoop - 1"));
context.write(NullWritable.get(), new Text("Apache Hadoop - 2"));
context.write(NullWritable.get(), new Text("Apache Hadoop - 3"));
于 2013-05-29T08:28:47.303 回答