7

我有一个任务,它在由输入记录的几个字段组织的多个目录中写入 avro 输出。

例如 :
各国历年的流程记录
并写入国家/年的目录结构
例如:
输出/usa/2015/outputs_usa_2015.avro
输出/uk/2014/outputs_uk_2014.avro
AvroMultipleOutputs multipleOutputs=new AvroMultipleOutputs(context);
....
....
     multipleOutputs.write("output", avroKey, NullWritable.get(), 
            OUTPUT_DIR + "/" + record.getCountry() + "/" + record.getYear() + "/outputs_" +record.getCountry()+"_"+ record.getYear());

下面的代码将使用哪个输出提交者来编写输出。与推测执行一起使用是否不安全?通过推测执行,这会导致(可能会导致)org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException

在这篇文章 中 Hadoop Reducer:如何使用推测执行输出到多个目录? 建议使用自定义输出提交器

hadoop AvroMultipleOutputs 的以下代码没有说明推测执行有任何问题

 private synchronized RecordWriter getRecordWriter(TaskAttemptContext taskContext,
          String baseFileName) throws IOException, InterruptedException {

    writer =
                ((OutputFormat) ReflectionUtils.newInstance(taskContext.getOutputFormatClass(),
                    taskContext.getConfiguration())).getRecordWriter(taskContext);
...
}

如果 baseoutput 路径位于作业目录之外,则 write 方法也不会记录任何问题

public void write(String namedOutput, Object key, Object value, String baseOutputPath)

在作业目录之外写入时,AvroMultipleOutputs(其他输出)是否存在具有推测执行的真正问题?如果,那么我如何覆盖 AvroMultipleOutputs 以拥有它自己的输出提交者。我在 AvroMultipleOutputs 中看不到它使用的输出提交者的任何输出格式

4

2 回答 2

1

AvroMultipleOutputs将使用OutputFormat您在作业配置中注册的内容,同时添加命名输出,例如使用来自(例如)的addNamedOutputAPI 。AvroMultipleOutputsAvroKeyValueOutputFormat

使用AvroMultipleOutputs,您可能无法使用推测性任务执行功能。即使覆盖它也无济于事或不简单。

相反,您应该编写自己的OutputFormat(很可能扩展一种可用的 Avro 输出格式,例如AvroKeyValueOutputFormat),并覆盖/实现其getRecordWriterAPI,它将返回一个RecordWriter实例MainRecordWriter(仅供参考)。

MainRecordWriter将维护RecordWriter(例如AvroKeyValueRecordWriter)实例的映射。这些RecordWriter实例中的每一个都属于输出文件之一。在writeAPI of 中MainRecordWriter,您将从地图中获取实际RecordWriter实例(基于您要写入的记录),并使用此记录写入器写入记录。因此MainRecordWriter,它只是作为多个RecordWriter实例的包装器工作。

对于一些类似的实现,您可能想从库中研究MultiStorage类的代码。piggybank

于 2015-05-14T20:01:53.290 回答
0

当您将命名输出添加到AvroMultipleOutputs时,它将调用AvroKeyOutputFormat.getRecordWriter()AvroKeyValueOutputFormat.getRecordWriter(),哪个调用AvroOutputFormatBase.getAvroFileOutputStream(),其内容为

protected OutputStream getAvroFileOutputStream(TaskAttemptContext context) throws IOException {
  Path path = new Path(((FileOutputCommitter)getOutputCommitter(context)).getWorkPath(),
    getUniqueFile(context,context.getConfiguration().get("avro.mo.config.namedOutput","part"),org.apache.avro.mapred.AvroOutputFormat.EXT));
  return path.getFileSystem(context.getConfiguration()).create(path);
}

AvroOutputFormatBase扩展FileOutputFormatgetOutputCommitter()上述方法中的实际上是对 . 的调用 FileOutputFormat.getOutputCommitter()。因此,AvroMultipleOutputs应该具有与 . 相同的约束MultipleOutputs

于 2015-05-14T23:22:51.863 回答