可能的重复:
Hadoop Map Reduce 链接中的数据共享
我有如下所述的 map reduce 链。
Job1(Map1 -> Reduce 1) --> Job2(Map2, Reduce2) Job1.waitForCompletion(true)
我需要一个值(假设 int a ,由 Reduce 1 创建),在 Map2 中。
我怎样才能做到这一点 ??请分享你的想法
可能的重复:
Hadoop Map Reduce 链接中的数据共享
我有如下所述的 map reduce 链。
Job1(Map1 -> Reduce 1) --> Job2(Map2, Reduce2) Job1.waitForCompletion(true)
我需要一个值(假设 int a ,由 Reduce 1 创建),在 Map2 中。
我怎样才能做到这一点 ??请分享你的想法
您可以使用 ChainMapper 和 ChainReducer 。这是一个为您提供帮助的示例代码。
Configuration conf = getConf();
JobConf job = new JobConf(conf);
JobConf Conf1 = new JobConf(false);
ChainMapper.setMapper
(job,
Map1.class,
LongWritable.class,
Text.class,
Text.class,
Text.class,
true,
Conf1);
JobConf Conf2 = new JobConf(false);
ChainReducer.setReducer
(job,
Reduce1.class,
Text.class,
Text.class,
Text.class,
Text.class,
true,
Conf2);
JobConf Conf3 = new JobConf(false);
ChainMapper.setMapper
(job,
Map2.class,
Text.class,
Text.class,
Text.class,
Text.class,
true,
Conf3);
JobConf Conf4 = new JobConf(false);
ChainReducer.setReducer
(job,
Reduce2.class,
Text.class,
Text.class,
Text.class,
Text.class,
true,
Conf4);
笔记:
the out-put Type of key-value derive which Mapper and reducer is to be called next so , the output Type of Map1 should me same as Input Type of key-value of Reduce1 AND the output Type of Reduce1 should me same as Input Type of key-value of Map2 and
the output Type of Map2 should me same as Input Type of key-value of Reduce2
您可以使用 Job1,Reduce1 中的计数器从 Job1 获取值,然后将其传递给 Job2 。这是需要编码的流事物的示例代码。
1.使用计数器设置值的示例代码
Reducer()
{
public static enum COUNTER {
INTVALUE
};
Reduce()
{
// Old API
reporter.incrCounter(COUNTER .INTVALUE, 1);
//NEW API
context.getCounter(COUNTER .INTVALUE).increment(1);
}
}
2.从job1中获取set counter,然后设置到Job2的JonConf中,mapper可以得到相同的值。
main()
{
// .....
jobclient1.submit(job1);
RunningJob job = JobClient1.runJob(conf); // blocks until job completes
Counters c = job.getCounters();
int value= c.getCounter(COUNTER .INTVALUE);
// Now set the value in Job2
Job job2 = new JobConf(conf);
job2.setInt("name", value);
}
3.Map2从Job1 counter->Jobconf2获取值
mapper()
{
int value;
@Override
public void configure(JobConf job) {
value=job.getInt("name", 0);
}
@Override
public void map(Text key, Text value,
OutputCollector<LongWritable, Text> output, Reporter arg3)
throws IOException {
}
}
----------
将 Reduce1 的输出保存到 flatfile(hdfs) 读取该文件,同时在 2nd job 中设置 driver(Job) 。然后在上下文中设置变量。
//read reducer output from file . and set it @name variable
Configuration conf = getConf();
Job job = new JobConf(conf);
conf.setInt("name", 0000);
在映射器中(map2)
mapper()
{
int value;
@Override
public void configure(JobConf job) {
value=job.getInt("name", 0);
}
@Override
public void map(Text key, Text value,
OutputCollector<LongWritable, Text> output, Reporter arg3)
throws IOException {
}
}