1

我正在尝试在 hadoop 中执行以下操作:

  1. 我已经实现了一个将文件输出到目录“foo”的 map-reduce 作业。
  2. foo 文件采用 key=IntWriteable, value=IntWriteable 格式(使用 SequenceFileOutputFormat)。
  3. 现在,我想开始另一个 map-reduce 工作。映射器很好,但每个减速器都需要在启动时读取整个“foo”文件(我正在使用 HDFS 在减速器之间共享数据)。

我在“public void configure(JobConf conf)”上使用了这段代码:

String uri = "out/foo";
FileSystem fs = FileSystem.get(URI.create(uri), conf);
FileStatus[] status = fs.listStatus(new Path(uri));
for (int i=0; i<status.length; ++i) {
    Path currFile = status[i].getPath();
    System.out.println("status: " + i + " " + currFile.toString());
    try {
        SequenceFile.Reader reader = null;
        reader = new SequenceFile.Reader(fs, currFile, conf);
        IntWritable key = (IntWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
        IntWritable value = (IntWritable ) ReflectionUtils.newInstance(reader.getValueClass(), conf);
        while (reader.next(key, value)) {
        // do the code for all the pairs.
        }
    }
}

该代码在单台机器上运行良好,但我不确定它是否会在集群上运行。换句话说,这段代码是从当前机器读取文件还是从分布式系统读取 id?

我正在尝试做的事情有更好的解决方案吗?

提前致谢,

阿里克。

4

1 回答 1

0

The URI for the FileSystem.get() does not have scheme defined and hence, the File System used depends on the configuration parameter fs.defaultFS. If none set, the default setting i.e LocalFile system will be used.

Your program writes to the Local file system under the workingDir/out/foo. It should work in the cluster as well but looks for the local file system.

With the above said, I'm not sure why you need the entire files from foo directory. You may have consider other designs. If needed, these files should copied to HDFS first and read the files from the overridden setup method of your reducer. Needless to say, to close the files opened in the overridden closeup method of your reducer. While the files can be read in reducers, the map/reduce programs are not designed for this kind of functionality.

于 2013-04-17T19:31:43.270 回答