我已经编写了一个 mapreduce 程序来处理日志。除了实际输出之外,该作业还将辅助数据写入驱动程序代码中设置的输出路径之外的不同位置。但是启用推测执行后,不会删除已终止任务尝试的输出。是有没有办法避免这个问题?除了写入正常输出位置并在作业完成后复制到外部位置之外,是否可以解决问题?
是否可以使用“OutputCommitter”解决此问题?
有没有人试过这个?任何帮助,将不胜感激。
是的,可以使用 FileOutputCommitter 在任务成功时将临时任务目录的内容移动到最终输出目录,并删除原始任务目录。
我相信大多数在 Hadoop 中扩展 FileOutputFormat 的内置输出格式都使用 OutputCommitter,默认情况下是 FileOutputCommitter。
这是来自 FileOutputFormat 的代码
public synchronized
OutputCommitter getOutputCommitter(TaskAttemptContext context
) throws IOException {
if (committer == null) {
Path output = getOutputPath(context);
committer = new FileOutputCommitter(output, context);
}
return committer;
}
要写入多个路径,您可能可以查看MultipleOutputs,默认情况下使用 OutputCommitter。
或者您可以创建自己的输出格式并扩展 FileOutputFomat 并覆盖 FileOutputFormat 中的上述函数,创建自己的 OutputCommitter 实现查看 FileOutputCommitter 代码。
在 FileOoutputcommitter 代码中,您将找到您可能感兴趣的函数 -
/**
* Delete the work directory
*/
@Override
public void abortTask(TaskAttemptContext context) {
try {
if (workPath != null) {
context.progress();
outputFileSystem.delete(workPath, true);
}
} catch (IOException ie) {
LOG.warn("Error discarding output" + StringUtils.stringifyException(ie));
}
}
如果任务成功,则调用 commitTask(),在默认实现中,它将临时任务输出目录(其名称中包含任务尝试 ID 以避免任务尝试之间的冲突)移动到最终输出路径 ${mapred.out把.dir}。否则,框架调用 abortTask(),删除临时任务输出目录。
为避免在 mapreduce 输出文件夹中创建 _logs 和 _SUCCESS 文件,您可以使用以下设置:
conf.setBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", false); conf.set("hadoop.job.history.user.location", "none");