2

我有以下目录结构,

Dir1
 |___Dir2 
  |___Dir3
   |___Dir4
     |___File1.gz
     |___File2.gz
     |___File3.gz

子目录只是嵌套的,不包含任何文件

我正在尝试使用以下内容在 HDFS 上的目录中进行递归。如果它是我附加/*到路径的目录,并且addInputPath

arg[0] = "path/to/Dir1"; // given at command line

FileStatus fs = new FileStatus(); 
Path q = new Path(args[0]); 
FileInputFormat.addInputPath(job,q);

Path p = new Path(q.toString()+"/*");
fs.setPath(p);  

while(fs.isDirectory())
{
    fs.setPath(new Path(p.toString()+"/*"));
    FileInputFormat.addInputPath(job,fs.getPath());
}           

但是代码似乎没有进入while循环,我得到了not a File异常

4

1 回答 1

4

您所指的if语句在哪里?
无论如何,您可以看看这些实用方法,它们将目录中的所有文件添加到作业的输入中:

Utils:

public static Path[] getRecursivePaths(FileSystem fs, String basePath) 
  throws IOException, URISyntaxException {
    List<Path> result = new ArrayList<Path>();
    basePath = fs.getUri() + basePath;
    FileStatus[] listStatus = fs.globStatus(new Path(basePath+"/*"));
    for (FileStatus fstat : listStatus) {
      readSubDirectory(fstat, basePath, fs, result);
    }
    return (Path[]) result.toArray(new Path[result.size()]);  
}

private static void readSubDirectory(FileStatus fileStatus, String basePath,
  FileSystem fs, List<Path> paths) throws IOException, URISyntaxException {
  if (!fileStatus.isDir()) {
   paths.add(fileStatus.getPath());
  }
  else {
    String subPath = fileStatus.getPath().toString();
    FileStatus[] listStatus = fs.globStatus(new Path(subPath + "/*"));
    if (listStatus.length == 0) {
      paths.add(fileStatus.getPath());
    }
    for (FileStatus fst : listStatus) {
      readSubDirectory(fst, subPath, fs, paths);
    }
  }
}

在您的工作跑步者课程中使用它:

...
Path[] inputPaths = Utils.getRecursivePaths(fs, inputPath);
FileInputFormat.setInputPaths(job, inputPaths);
...
于 2013-07-13T21:26:57.917 回答