3

我已经链接了两个 Map reduce 工作。Job1 将只有一个减速器,我正在计算一个浮点值。我想在 Job2 的减速器中使用这个值。这是我的主要方法设置。

public static String GlobalVriable;
public static void main(String[] args) throws Exception {

        int runs = 0;
        for (; runs < 10; runs++) {
            String inputPath = "part-r-000" + nf.format(runs);
            String outputPath = "part-r-000" + nf.format(runs + 1);
            MyProgram.MR1(inputPath);
            MyProgram.MR2(inputPath, outputPath);
        }
    }

    public static void MR1(String inputPath)
            throws IOException, InterruptedException, ClassNotFoundException {

        Configuration conf = new Configuration();
        conf.set("var1","");
        Job job = new Job(conf, "This is job1");
        job.setJarByClass(MyProgram.class);
        job.setMapperClass(MyMapper1.class);
        job.setReducerClass(MyReduce1.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FloatWritable.class);
        FileInputFormat.addInputPath(job, new Path(inputPath));
        job.waitForCompletion(true);
        GlobalVriable = conf.get("var1"); // I am getting NULL here
    }

    public static void MR2(String inputPath, String outputPath)
            throws IOException, InterruptedException, ClassNotFoundException {

        Configuration conf = new Configuration();
        Job job = new Job(conf, "This is job2");
        ...
    }

    public static class MyReduce1 extends
        Reducer<Text, FloatWritable, Text, FloatWritable> {

    public void reduce(Text key, Iterable<FloatWritable> values, Context context)
            throws IOException, InterruptedException {

        float s = 0;
        for (FloatWritable val : values) {
            s += val.get();
        }

        String sum = Float.toString(s);
        context.getConfiguration().set("var1", sum);
    }
}

如您所见,我需要多次迭代整个程序。我的 Job1 正在从输入中计算一个数字。由于它只是一个数字和很多迭代,我不想将它写入 HDFS 并从中读取。有没有办法共享在 Myreducer1 中计算的值并在 Myreducer2 中使用它。

更新:我尝试使用 conf.set 和 conf.get 传递值。该值没有被传递。

4

3 回答 3

5

这是通过计数器传回浮点值的方法...

首先,在第一个 reducer 中,通过乘以 1000 将浮点值转换为 long(例如,以保持 3 位精度)并将结果放入计数器中:

public void cleanup(Context context) {

    long result = (long) (floatValue * 1000);
    context.getCounter("Result","Result").increment(result); 

}

在驱动程序类中,检索长值并将其转换回浮点数:

public static void MR1(String inputPath)
        throws IOException, InterruptedException, ClassNotFoundException {

    Configuration conf = new Configuration();
    Job job = new Job(conf, "This is job1");
    job.setJarByClass(MyProgram.class);
    job.setMapperClass(MyMapper1.class);
    job.setReducerClass(MyReduce1.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(FloatWritable.class);
    FileInputFormat.addInputPath(job, new Path(inputPath));
    job.waitForCompletion(true);

    long result = job.getCounters().findCounter("Result","Result").getValue();
    float value = ((float)result) / 1000;

}
于 2012-12-01T21:24:11.330 回答
1

您可以为此使用ZooKeeper 。它非常适合像这样的任何工作间协调或消息传递。

于 2012-12-01T01:20:18.973 回答
0

您不能只更改 to 的返回类型MR1int或任何适当的数据类型)并返回您计算的数字:

    int myNumber = MyProgram.MR1(inputPath);

然后添加一个参数MR2并使用您计算的数字调用它:

    MyProgram.MR2(inputPath, outputPath, myNumber);
于 2012-12-01T00:27:00.360 回答