当我每天运行我的 Hadoop 作业时,我想覆盖/重用现有的输出目录。实际上输出目录将存储每天作业运行结果的汇总输出。如果我指定相同的输出目录,则会给出错误“输出目录已存在”。
如何绕过此验证?
当我每天运行我的 Hadoop 作业时,我想覆盖/重用现有的输出目录。实际上输出目录将存储每天作业运行结果的汇总输出。如果我指定相同的输出目录,则会给出错误“输出目录已存在”。
如何绕过此验证?
在运行作业之前删除目录怎么样?
您可以通过 shell 执行此操作:
hadoop fs -rmr /path/to/your/output/
或通过 Java API:
// configuration should contain reference to your namenode
FileSystem fs = FileSystem.get(new Configuration());
// true stands for recursively deleting the folder you gave
fs.delete(new Path("/path/to/your/output"), true);
Jungblut 的答案是您的直接解决方案。由于我从不信任自动化流程来删除东西(我个人),我会建议一个替代方案:
我建议您不要尝试覆盖,而是让您的工作的输出名称动态化,包括它运行的时间。
“”之类的东西/path/to/your/output-2011-10-09-23-04/
。这样,您可以保留旧的工作输出,以防您需要重新访问。在我的系统中,每天运行 10 多个工作,我们将输出构造为:/output/job1/2011/10/09/job1out/part-r-xxxxx
、、/output/job1/2011/10/10/job1out/part-r-xxxxx
等。
Hadoop TextInputFormat
(我猜你正在使用)不允许覆盖现有目录。可能是为了原谅你发现你错误地删除了你(和你的集群)非常努力的东西的痛苦。
但是,如果您确定希望您的输出文件夹被作业覆盖,我相信最干净的方法是进行TextOutputFormat
如下更改:
public class OverwriteTextOutputFormat<K, V> extends TextOutputFormat<K, V>
{
public RecordWriter<K, V>
getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException
{
Configuration conf = job.getConfiguration();
boolean isCompressed = getCompressOutput(job);
String keyValueSeparator= conf.get("mapred.textoutputformat.separator","\t");
CompressionCodec codec = null;
String extension = "";
if (isCompressed)
{
Class<? extends CompressionCodec> codecClass =
getOutputCompressorClass(job, GzipCodec.class);
codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
extension = codec.getDefaultExtension();
}
Path file = getDefaultWorkFile(job, extension);
FileSystem fs = file.getFileSystem(conf);
FSDataOutputStream fileOut = fs.create(file, true);
if (!isCompressed)
{
return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
}
else
{
return new LineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)),keyValueSeparator);
}
}
}
现在您正在使用 overwrite=true创建FSDataOutputStream
( )。fs.create(file, true)
Hadoop already supports the effect you seem to be trying to achieve by allowing multiple input paths to a job. Instead of trying to have a single directory of files to which you add more files, have a directory of directories to which you add new directories. To use the aggregate result as input, simply specify the input glob as a wildcard over the subdirectories (e.g., my-aggregate-output/*
). To "append" new data to the aggregate as output, simply specify a new unique subdirectory of the aggregate as the output directory, generally using a timestamp or some sequence number derived from your input data (e.g. my-aggregate-output/20140415154424
).
如果将输入文件(例如,附加条目)从本地文件系统加载到 hadoop 分布式文件系统,如下所示:
hdfs dfs -put /mylocalfile /user/cloudera/purchase
然后也可以用 . 覆盖/重用现有的输出目录-f
。无需删除或重新创建文件夹
hdfs dfs -put -f /updated_mylocalfile /user/cloudera/purchase
Hadoop 遵循“一次写入,多次读取”的理念。因此,当您尝试再次写入目录时,它假定它必须创建一个新目录(写入一次)但它已经存在,因此它会抱怨。您可以通过删除它hadoop fs -rmr /path/to/your/output/
。最好创建一个动态目录(例如,基于时间戳或哈希值)以保存数据。
您可以按时间为每次执行创建一个输出子目录。例如,假设您期望用户的输出目录,然后将其设置如下:
FileOutputFormat.setOutputPath(job, new Path(args[1]);
通过以下几行更改它:
String timeStamp = new SimpleDateFormat("yyyy.MM.dd.HH.mm.ss", Locale.US).format(new Timestamp(System.currentTimeMillis()));
FileOutputFormat.setOutputPath(job, new Path(args[1] + "/" + timeStamp));
您需要在主类中添加设置:
//Configuring the output path from the filesystem into the job
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//auto_delete output dir
OutputPath.getFileSystem(conf).delete(OutputPath);
我遇到了这个确切的问题,它源于checkOutputSpecs
class 中引发的异常FileOutputFormat
。就我而言,我希望有很多工作将文件添加到已经存在的目录中,并且我保证这些文件将具有唯一的名称。
我通过创建一个输出格式类来解决它,该类只覆盖checkOutputSpecs
方法并窒息(忽略)FileAlreadyExistsException
它检查目录是否已经存在的地方。
public class OverwriteTextOutputFormat<K, V> extends TextOutputFormat<K, V> {
@Override
public void checkOutputSpecs(JobContext job) throws IOException {
try {
super.checkOutputSpecs(job);
}catch (FileAlreadyExistsException ignored){
// Suffocate the exception
}
}
}
在工作配置中,我使用LazyOutputFormat
了MultipleOutputs
.
LazyOutputFormat.setOutputFormatClass(job, OverwriteTextOutputFormat.class);
我有一个类似的用例,我MultipleOutputs
用来解决这个问题。
例如,如果我希望不同的 MapReduce 作业写入同一个目录/outputDir/
。作业 1 写入/outputDir/job1-part1.txt
,作业 2 写入/outputDir/job1-part2.txt
(不删除现有文件)。
在 main 中,将输出目录设置为随机目录(可以在新作业运行之前将其删除)
FileInputFormat.addInputPath(job, new Path("/randomPath"));
在 reducer/mapper 中,使用MultipleOutputs
并设置 writer 写入所需的目录:
public void setup(Context context) {
MultipleOutputs mos = new MultipleOutputs(context);
}
和:
mos.write(key, value, "/outputDir/fileOfJobX.txt")
但是,我的用例比这要复杂一些。如果只是写入同一个平面目录,您可以写入不同的目录并运行脚本来迁移文件,例如:hadoop fs -mv /tmp/* /outputDir
在我的用例中,每个 MapReduce 作业根据正在写入的消息的值写入不同的子目录。目录结构可以是多层的,例如:
/outputDir/
messageTypeA/
messageSubTypeA1/
job1Output/
job1-part1.txt
job1-part2.txt
...
job2Output/
job2-part1.txt
...
messageSubTypeA2/
...
messageTypeB/
...
每个 Mapreduce 作业可以写入数千个子目录。写入 tmp 目录并将每个文件移动到正确目录的成本很高。