我在许多输入文件上运行 hadoop 作业。但是,如果其中一个文件损坏,整个工作就会失败。
我怎样才能让工作忽略损坏的文件?也许为我写一些计数器/错误日志,但不会让整个工作失败
这取决于你的工作失败的地方——如果一行损坏,并且在你的 map 方法中的某个地方抛出了一个异常,那么你应该能够用 try / catch 包装你的 map 方法的主体并记录错误:
protected void map(LongWritable key, Text value, Context context) {
try {
// parse value to a long
int val = Integer.parseInt(value.toString());
// do something with key and val..
} catch (NumberFormatException nfe) {
// log error and continue
}
}
但是如果错误是由 InputFormat 的 RecordReader 引发的,那么您需要修改 mappersrun(..)
方法 - 默认实现如下:
public void run(Context context) {
setup(context);
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
cleanup(context);
}
因此,您可以修改它以尝试捕获context.nextKeyValue()
调用中的异常,但您必须小心忽略阅读器抛出的任何错误 - 例如,IOExeption 可能无法通过忽略错误来“跳过”。
如果您已经编写了自己的 InputFormat / RecordReader,并且您有一个表示记录失败但允许您跳过并继续解析的特定异常,那么这样的事情可能会起作用:
public void run(Context context) {
setup(context);
while (true) {
try {
if (!context.nextKeyValue()) {
break;
} else {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
} catch (SkippableRecordException sre) {
// log error
}
}
cleanup(context);
}
但只是为了重新迭代-您的 RecordReader 必须能够在错误时恢复,否则上述代码可能会将您送入无限循环。
对于您的特定情况-如果您只想在第一次失败时忽略文件,那么您可以将 run 方法更新为更简单的方法:
public void run(Context context) {
setup(context);
try {
while (context.nextKeyValue()) {
map(context.getCurrentKey(), context.getCurrentValue(), context);
}
cleanup(context);
} catch (Exception e) {
// log error
}
}
最后的一些警告:
这就是故障陷阱在级联中的用途:
每当操作失败并抛出异常时,如果存在关联的陷阱,则将有问题的 Tuple 保存到陷阱 Tap 指定的资源中。这允许作业继续处理而不会丢失任何数据。
这基本上可以让您的工作继续进行,并让您稍后检查损坏的文件
如果您对流定义语句中的级联有点熟悉:
new FlowDef().addTrap( String branchName, Tap trap );
还有另一种可能的方式。您可以使用mapred.max.map.failures.percent
配置选项。当然,这种解决这个问题的方法也可以隐藏在地图阶段发生的一些其他问题。