22

当我每天运行我的 Hadoop 作业时,我想覆盖/重用现有的输出目录。实际上输出目录将存储每天作业运行结果的汇总输出。如果我指定相同的输出目录,则会给出错误“输出目录已存在”。

如何绕过此验证?

4

10 回答 10

17

在运行作业之前删除目录怎么样?

您可以通过 shell 执行此操作:

hadoop fs -rmr /path/to/your/output/

或通过 Java API:

// configuration should contain reference to your namenode
FileSystem fs = FileSystem.get(new Configuration());
// true stands for recursively deleting the folder you gave
fs.delete(new Path("/path/to/your/output"), true);
于 2011-10-10T13:53:58.440 回答
12

Jungblut 的答案是您的直接解决方案。由于我从不信任自动化流程来删除东西(我个人),我会建议一个替代方案:

我建议您不要尝试覆盖,而是让您的工作的输出名称动态化,包括它运行的时间。

“”之类的东西/path/to/your/output-2011-10-09-23-04/。这样,您可以保留旧的工作输出,以防您需要重新访问。在我的系统中,每天运行 10 多个工作,我们将输出构造为:/output/job1/2011/10/09/job1out/part-r-xxxxx、、/output/job1/2011/10/10/job1out/part-r-xxxxx等。

于 2011-10-10T14:10:51.003 回答
5

Hadoop TextInputFormat(我猜你正在使用)不允许覆盖现有目录。可能是为了原谅你发现你错误地删除了你(和你的集群)非常努力的东西的痛苦。

但是,如果您确定希望您的输出文件夹被作业覆盖,我相信最干净的方法是进行TextOutputFormat如下更改:

public class OverwriteTextOutputFormat<K, V> extends TextOutputFormat<K, V>
{
      public RecordWriter<K, V> 
      getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException 
      {
          Configuration conf = job.getConfiguration();
          boolean isCompressed = getCompressOutput(job);
          String keyValueSeparator= conf.get("mapred.textoutputformat.separator","\t");
          CompressionCodec codec = null;
          String extension = "";
          if (isCompressed) 
          {
              Class<? extends CompressionCodec> codecClass = 
                      getOutputCompressorClass(job, GzipCodec.class);
              codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
              extension = codec.getDefaultExtension();
          }
          Path file = getDefaultWorkFile(job, extension);
          FileSystem fs = file.getFileSystem(conf);
          FSDataOutputStream fileOut = fs.create(file, true);
          if (!isCompressed) 
          {
              return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
          } 
          else 
          {
              return new LineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)),keyValueSeparator);
          }
      }
}

现在您正在使用 overwrite=true创建FSDataOutputStream( )。fs.create(file, true)

于 2014-08-05T07:55:29.450 回答
1

Hadoop already supports the effect you seem to be trying to achieve by allowing multiple input paths to a job. Instead of trying to have a single directory of files to which you add more files, have a directory of directories to which you add new directories. To use the aggregate result as input, simply specify the input glob as a wildcard over the subdirectories (e.g., my-aggregate-output/*). To "append" new data to the aggregate as output, simply specify a new unique subdirectory of the aggregate as the output directory, generally using a timestamp or some sequence number derived from your input data (e.g. my-aggregate-output/20140415154424).

于 2015-04-15T15:45:50.820 回答
1

如果将输入文件(例如,附加条目)从本地文件系统加载到 hadoop 分布式文件系统,如下所示:

hdfs dfs -put  /mylocalfile /user/cloudera/purchase

然后也可以用 . 覆盖/重用现有的输出目录-f。无需删除或重新创建文件夹

hdfs dfs -put -f  /updated_mylocalfile /user/cloudera/purchase
于 2018-10-16T12:03:32.197 回答
1

Hadoop 遵循“一次写入,多次读取”的理念。因此,当您尝试再次写入目录时,它假定它必须创建一个新目录(写入一次)但它已经存在,因此它会抱怨。您可以通过删除它hadoop fs -rmr /path/to/your/output/。最好创建一个动态目录(例如,基于时间戳或哈希值)以保存数据。

于 2019-01-28T11:16:34.087 回答
0

您可以按时间为每次执行创建一个输出子目录。例如,假设您期望用户的输出目录,然后将其设置如下:

FileOutputFormat.setOutputPath(job, new Path(args[1]);

通过以下几行更改它:

String timeStamp = new SimpleDateFormat("yyyy.MM.dd.HH.mm.ss", Locale.US).format(new Timestamp(System.currentTimeMillis()));
FileOutputFormat.setOutputPath(job, new Path(args[1] + "/" + timeStamp));
于 2016-08-05T21:59:31.080 回答
0

您需要在主类中添加设置:

//Configuring the output path from the filesystem into the job
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//auto_delete output dir
OutputPath.getFileSystem(conf).delete(OutputPath);
于 2020-04-20T02:40:14.377 回答
0

我遇到了这个确切的问题,它源于checkOutputSpecsclass 中引发的异常FileOutputFormat。就我而言,我希望有很多工作将文件添加到已经存在的目录中,并且我保证这些文件将具有唯一的名称。

我通过创建一个输出格式类来解决它,该类只覆盖checkOutputSpecs方法并窒息(忽略)FileAlreadyExistsException它检查目录是否已经存在的地方。

public class OverwriteTextOutputFormat<K, V> extends TextOutputFormat<K, V> {
    @Override
    public void checkOutputSpecs(JobContext job) throws IOException {
        try {
            super.checkOutputSpecs(job);
        }catch (FileAlreadyExistsException ignored){
            // Suffocate the exception
        }
    }
}

在工作配置中,我使用LazyOutputFormatMultipleOutputs.

LazyOutputFormat.setOutputFormatClass(job, OverwriteTextOutputFormat.class);
于 2019-07-16T00:39:33.187 回答
0

我有一个类似的用例,我MultipleOutputs用来解决这个问题。

例如,如果我希望不同的 MapReduce 作业写入同一个目录/outputDir/。作业 1 写入/outputDir/job1-part1.txt,作业 2 写入/outputDir/job1-part2.txt(不删除现有文件)。

在 main 中,将输出目录设置为随机目录(可以在新作业运行之前将其删除)

FileInputFormat.addInputPath(job, new Path("/randomPath"));

在 reducer/mapper 中,使用MultipleOutputs并设置 writer 写入所需的目录:

public void setup(Context context) {
    MultipleOutputs mos = new MultipleOutputs(context);
}

和:

mos.write(key, value, "/outputDir/fileOfJobX.txt")

但是,我的用例比这要复杂一些。如果只是写入同一个平面目录,您可以写入不同的目录并运行脚本来迁移文件,例如:hadoop fs -mv /tmp/* /outputDir

在我的用例中,每个 MapReduce 作业根据正在写入的消息的值写入不同的子目录。目录结构可以是多层的,例如:

/outputDir/
    messageTypeA/
        messageSubTypeA1/
            job1Output/
                job1-part1.txt
                job1-part2.txt
                ...
            job2Output/
                job2-part1.txt
                ...

        messageSubTypeA2/
        ...
    messageTypeB/
    ...

每个 Mapreduce 作业可以写入数千个子目录。写入 tmp 目录并将每个文件移动到正确目录的成本很高。

于 2018-09-07T04:27:52.793 回答