2

我正在使用 Hadoop 0.20.2(无法更改)并且我想在我的输入路径中添加一个过滤器。数据如下所示:

/path1/test_a1
/path1/test_a2
/path1/train_a1
/path1/train_a2

我只想处理所有带有火车的文件。

看一下 FileInputFormat 类建议使用:

 FileInputFormat.setInputPathFilter(Job job, Class<? extends PathFilter> filter)

这就是我的问题开始的地方,因为PathFilter是一个接口 - 当然,我可以扩展接口,但我仍然没有实现。因此,我实现了接口:

class TrainFilter implements PathFilter
{
   boolean accept(Path path)
   {
      return path.toString().contains("train");
   }
}

当我使用 TrainFilter 作为 PathFilter 代码编译时,但是当我运行它时,由于输入路径被搞砸了,我得到一个异常。如果不设置过滤器,我的代码会运行 /path1 下的所有文件,但是,在设置过滤器时,它会抛出错误:

InvalidInputException: Input path does not exist hdfs://localhost:9000/path1

以下是我在驱动程序代码中的设置方式:

job.setMapperClass(....class);
job.setInputFormatClass(....class);
job.setMapOutputKeyClass(...class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);

FileInputFormat.setInputPathFilter(job, TrainFilter.class);
FileInputFormat.addInputPath(job, new Path("/path1/"));
FileOutputFormat.setOutputPath(job, new Path("/path2/"));
job.waitForCompletion(true);

关于我在这里做错了什么的任何建议?

编辑:我发现了问题。对 PathFilter 的第一次调用始终是目录本身 (/path1),因为它不包含 ("train"),所以目录本身是无效的,因此会引发异常。这让我想到了另一个问题:我如何测试任意路径是否是目录?据我所知,我需要一个对 FileSystem 的引用,它不是 PathFilter 的默认参数之一。

4

4 回答 4

6

或者,您可以尝试遍历给定目录中的所有文件并检查文件名是否以train开头。例如:

        Job job = new Job(conf, "myJob");
        List<Path> inputhPaths = new ArrayList<Path>();

        String basePath = "/user/hadoop/path";
        FileSystem fs = FileSystem.get(conf);
        FileStatus[] listStatus = fs.globStatus(new Path(basePath + "/train*"));
        for (FileStatus fstat : listStatus) {
            inputhPaths.add(fstat.getPath());
        }

        FileInputFormat.setInputPaths(job,
                (Path[]) inputhPaths.toArray(new Path[inputhPaths.size()]));
于 2012-11-19T12:57:49.617 回答
2

快速修复,如果路径包含“test”,您可以将路径列入黑名单而不是像 return false 那样列入白名单

于 2014-05-06T19:27:57.943 回答
1

您可以通过让 Filter 实现 Configurable 接口(或扩展 Configured 类)来获取 FileSystem 实例,并在 setConf 方法中创建 fileSystem 实例变量:

class TrainFilter extends Configured implements PathFilter
{
   FileSystem fileSystem;

   boolean accept(Path path)
   {
      // TODO: use fileSystem here to determine if path is a directory
      return path.toString().contains("train");
   }

   public void setConf(Configuration conf) {
     if (conf != null) {
       fileSystem = FileSystem.get(conf);
     }
   }
}
于 2012-11-27T01:32:29.843 回答
0

我知道这是一个非常古老的问题,但是当它的所有示例都失败时,它帮助我找到了排除路径的答案,例如这里记录的。

我只是想警告@ChrisWhite 给出的答案,因为我正在使用 Hadoop 3.3.0 API 并且当我要从配置中拉出一些东西时,使用 setConf 抛出了 NullPointerException。我发现相反,我将保留extends Configured在 PathFilter 签名中,但getConf().get(<your configuration parameter name>)只要您需要从作业配置中获得所需的内容,就可以将其放入。

所以,我的排除 PathFilter 看起来像这样:

public static class ExcludedPathsFilter extends Configured implements PathFilter {

    public boolean accept(Path includePathGlob){
        //debugging
        System.out.println("excludedPath parameter is "+getConf().get("excludedPath")+", includePath parameter is "+includePathGlob.toString()+" and !includePathGlob.toString().contains(getConf().get(\"excludedPath\")) is "+!includePathGlob.toString().contains(getConf().get("excludedPath")));
        return !includePathGlob.toString().contains(getConf().get("excludedPath"));
    }

}
于 2021-01-06T15:03:13.973 回答