0

可能的重复:
Hadoop Map Reduce 链接中的数据共享

我有如下所述的 map reduce 链。

Job1(Map1 -> Reduce 1) --> Job2(Map2, Reduce2) Job1.waitForCompletion(true)

我需要一个值(假设 int a ,由 Reduce 1 创建),在 Map2 中。

我怎样才能做到这一点 ??请分享你的想法

4

3 回答 3

1

您可以使用 ChainMapper 和 ChainReducer 。这是一个为您提供帮助的示例代码。



Configuration conf = getConf();
JobConf job = new JobConf(conf);
JobConf Conf1 = new JobConf(false);
ChainMapper.setMapper
(job,
Map1.class,
LongWritable.class,
Text.class,
Text.class,
Text.class,
true,
Conf1);


 JobConf Conf2 = new JobConf(false);
    ChainReducer.setReducer
    (job,
    Reduce1.class,
    Text.class,
    Text.class,
    Text.class,
    Text.class,
    true,
    Conf2);
JobConf Conf3 = new JobConf(false);
    ChainMapper.setMapper
    (job,
    Map2.class,
    Text.class,
    Text.class,
    Text.class,
    Text.class,
    true,
    Conf3);
JobConf Conf4 = new JobConf(false);
    ChainReducer.setReducer
    (job,
    Reduce2.class,
    Text.class,
    Text.class,
    Text.class,
    Text.class,
    true,
    Conf4);


笔记:

the out-put Type of  key-value derive which Mapper and reducer is to be called next so , the output Type of Map1 should me same as Input Type of key-value of Reduce1 AND the output Type of Reduce1 should me same as Input Type of key-value of Map2 and 
the output Type of Map2 should me same as Input Type of key-value of Reduce2
于 2012-11-02T10:27:46.103 回答
0

解决问题的计数器


您可以使用 Job1,Reduce1 中的计数器从 Job1 获取值,然后将其传递给 Job2 。这是需要编码的流事物的示例代码。

1.使用计数器设置值的示例代码

    Reducer()
{
public static enum COUNTER {
  INTVALUE
};

Reduce()
{
// Old API
reporter.incrCounter(COUNTER .INTVALUE, 1);

//NEW API
context.getCounter(COUNTER .INTVALUE).increment(1);

}

}


2.从job1中获取set counter,然后设置到Job2的JonConf中,mapper可以得到相同的值。


main()
{
// .....
jobclient1.submit(job1);
RunningJob job = JobClient1.runJob(conf);  // blocks until job completes
Counters c = job.getCounters();
int value= c.getCounter(COUNTER .INTVALUE);

// Now set the value in Job2 
Job job2 = new JobConf(conf);
job2.setInt("name", value);
}


3.Map2从Job1 counter->Jobconf2获取值


    mapper()
 {
    int value;

    @Override
    public void configure(JobConf job) {

        value=job.getInt("name", 0);
    }

    @Override
    public void map(Text key, Text value,
            OutputCollector<LongWritable, Text> output, Reporter arg3)
            throws IOException {


    }
}
于 2012-11-02T12:41:49.297 回答
0
----------

另一个答案


将 Reduce1 的输出保存到 flatfile(hdfs) 读取该文件,同时在 2nd job 中设置 driver(Job) 。然后在上下文中设置变量。


//read reducer output from file . and set it @name variable 
Configuration conf = getConf();
Job job = new JobConf(conf);
conf.setInt("name", 0000);

在映射器中(map2)

    mapper()
{
int value;

        @Override
        public void configure(JobConf job) {

            value=job.getInt("name", 0);
        }

        @Override
        public void map(Text key, Text value,
                OutputCollector<LongWritable, Text> output, Reporter arg3)
                throws IOException {


        }
}

于 2012-11-02T15:01:29.013 回答