0

这是场景

           Reducer1  
         /  
Mapper - - Reducer2  
         \   
           ReducerN  

在减速器中,我想将数据写入不同的文件,假设减速器看起来像

def reduce():  
  for line in sys.STDIN:  
    if(line == type1):
      create_type_1_file(line)
    if(line == type2):
      create_type_2_file(line)
    if(line == type3):
      create_type3_file(line)
      ... and so on  
def create_type_1_file(line):
  # writes to file1  
def create_type2_file(line):
  # writes to file2  
def create_type_3_file(line):
  # write to file 3  

考虑写成的路径:

file1 = /home/user/data/file1  
file2 = /home/user/data/file2  
file3 = /home/user/data/file3  

当我运行时pseudo-distributed mode(machine with one node and hdfs daemons running),一切都很好,因为所有守护进程都会写入同一组文件

问题: - 如果我在 1000 台机器的集群中运行它,它们是否会写入同一组文件?我writing to local filesystem在这种情况下,有没有更好的方法来执行这个操作hadoop streaming

4

2 回答 2

0

通常,reduce 的 o/p 会写入 HDFS 等可靠的存储系统,因为如果其中一个节点出现故障,那么与该节点关联的 reduce 数据就会丢失。不可能在 Hadoop 框架的上下文之外再次运行特定的 reduce 任务。此外,一旦作业完成,来自 1000 个节点的 o/p 必须针对不同的输入类型进行合并。

HDFS不支持并发写入。可能存在多个减速器可能正在写入 HDFS 中的同一个文件的情况,这可能会损坏文件。当多个reduce 任务在单个节点上运行时,在写入单个本地文件时并发可能也是一个问题。

一种解决方案是使用reduce 任务特定的文件名,然后将所有文件组合为特定的输入类型。

于 2011-10-11T01:38:31.333 回答
0

可以使用 MultipleOutputs 类从 Reducer 将输出写入多个位置。您可以将 file1、file2 和 file3 视为三个文件夹,并将 1000 个 Reducer 的输出数据分别写入这些文件夹。


作业提交的使用模式:

 Job job = new Job();

 FileInputFormat.setInputPath(job, inDir);

//outDir is the root path, in this case, outDir="/home/user/data/"
 FileOutputFormat.setOutputPath(job, outDir);

//You have to assign the output formatclass.Using MultipleOutputs in this way will still create zero-sized default output, eg part-00000. To prevent this use LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); instead of job.setOutputFormatClass(TextOutputFormat.class); in your Hadoop job configuration.

LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); 

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

 job.setMapperClass(MOMap.class);

 job.setReducerClass(MOReduce.class);

 ...

 job.waitForCompletion(true);

在减速机中的用法:

private MultipleOutputs out;

 public void setup(Context context) {

   out = new MultipleOutputs(context);

   ...

 }

 public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

//'/' characters in baseOutputPath will be translated into directory levels in your file system. Also, append your custom-generated path with "part" or similar, otherwise your output will be -00000, -00001 etc. No call to context.write() is necessary.
 for (Text line : values) {

    if(line == type1)
      out.write(key, new Text(line),"file1/part");

  else  if(line == type2)
      out.write(key, new Text(line),"file2/part");

 else   if(line == type3)
      out.write(key, new Text(line),"file3/part");
   }
 }

 protected void cleanup(Context context) throws IOException, InterruptedException {
       out.close();
   }

参考:https ://hadoop.apache.org/docs/r2.6.3/api/org/apache/hadoop/mapreduce/lib/output/MultipleOutputs.html

于 2016-07-06T07:58:11.953 回答