我正在尝试在 hadoop 中执行以下操作:
- 我已经实现了一个将文件输出到目录“foo”的 map-reduce 作业。
- foo 文件采用 key=IntWriteable, value=IntWriteable 格式(使用 SequenceFileOutputFormat)。
- 现在,我想开始另一个 map-reduce 工作。映射器很好,但每个减速器都需要在启动时读取整个“foo”文件(我正在使用 HDFS 在减速器之间共享数据)。
我在“public void configure(JobConf conf)”上使用了这段代码:
String uri = "out/foo";
FileSystem fs = FileSystem.get(URI.create(uri), conf);
FileStatus[] status = fs.listStatus(new Path(uri));
for (int i=0; i<status.length; ++i) {
Path currFile = status[i].getPath();
System.out.println("status: " + i + " " + currFile.toString());
try {
SequenceFile.Reader reader = null;
reader = new SequenceFile.Reader(fs, currFile, conf);
IntWritable key = (IntWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
IntWritable value = (IntWritable ) ReflectionUtils.newInstance(reader.getValueClass(), conf);
while (reader.next(key, value)) {
// do the code for all the pairs.
}
}
}
该代码在单台机器上运行良好,但我不确定它是否会在集群上运行。换句话说,这段代码是从当前机器读取文件还是从分布式系统读取 id?
我正在尝试做的事情有更好的解决方案吗?
提前致谢,
阿里克。