-1

我正在分析日志文件的数据管道内运行 EMR 活动,当我的管道失败时出现以下错误:

线程“主”org.apache.hadoop.mapred.FileAlreadyExistsException 中的异常:输出目录 hdfs://10.211.146.177:9000/home/hadoop/temp-output-s3copy-2013-05-24-00 已经存在于 org。 apache.hadoop.mapred.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:121) at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:944) at org.apache.hadoop.mapred.JobClient$2.run( JobClient.java:905) 在 java.security.AccessController.doPrivileged(Native Method) 在 javax.security.auth.Subject.doAs(Subject.java:396) 在 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation. java:1132) 在 org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:905) 在 org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:879) 在 org.apache.hadoop.mapred .JobClient.runJob(JobClient.java:1316)在 com.valtira.datapipeline.stream.CloudFrontStreamLogProcessors.main(CloudFrontStreamLogProcessors.java:216) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) 在 sun .reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.apache.hadoop.util.RunJar.main(RunJar.java:187)org.apache.hadoop.util.RunJar.main(RunJar.java:187) 上的 reflect.Method.invoke(Method.java:597)org.apache.hadoop.util.RunJar.main(RunJar.java:187) 上的 reflect.Method.invoke(Method.java:597)

我尝试通过添加删除该文件夹:

文件系统 fs = FileSystem.get(getConf()); fs.delete(new Path("path/to/file"), true); // 删除文件,递归为真

但它不起作用。有没有办法在 java 中覆盖来自 Hadoop 的 FileOutputFormat 方法?有没有办法在java中忽略这个错误?

4

1 回答 1

0

当输出目录使用日期命名时,要删除的文件的路径会发生变化。有两种删除方法:

在外壳上,试试这个:

hadoop dfs -rmr hdfs://127.0.0.1:9000/home/hadoop/temp-output-s3copy-*

通过 Java 代码执行此操作:

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.mortbay.log.Log;

public class FSDeletion {

  public static void main(String[] args) {

    try {
      Configuration conf = new Configuration();
      FileSystem fs = FileSystem.get(conf);

      String fsName = conf.get("fs.default.name", "localhost:9000");
      String baseDir = "/home/hadoop/";
      String outputDirPattern = fsName + baseDir + "temp-output-s3copy-";

      Path[] paths = new Path[1];
      paths[0] = new Path(baseDir);

      FileStatus[] status = fs.listStatus(paths);
      Path[] listedPaths = FileUtil.stat2Paths(status);
      for (Path p : listedPaths) {
        if (p.toString().startsWith(outputDirPattern)) {
          Log.info("Attempting to delete : " + p);
          boolean result = fs.delete(p, true);
          Log.info("Deleted ? : " + result);
        }
      }

      fs.close();
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
}
于 2013-05-29T21:31:19.127 回答