2

我正在使用 MapReduce 框架用 Java 制作一个 Hadoop 应用程序。

我只对输入和输出使用 Text 键和值。在减少到最终输出之前,我使用组合器进行额外的计算步骤。

但我有一个问题,钥匙不去同一个减速器。我在组合器中创建并添加这样的键/值对:

public static class Step4Combiner extends Reducer<Text,Text,Text,Text> {
    private static Text key0 = new Text();
    private static Text key1 = new Text();

        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                key0.set("KeyOne");
                key1.set("KeyTwo");
                context.write(key0, new Text("some value"));
                context.write(key1, new Text("some other value"));
        }

}   

public static class Step4Reducer extends Reducer<Text,Text,Text,Text> {

            public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                System.out.print("Key:" + key.toString() + " Value: ");
                String theOutput = "";
                for (Text val : values) {
                    System.out.print("," + val);
                }
                System.out.print("\n");

                context.write(key, new Text(theOutput));
            }

}

我主要创建这样的工作:

Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

Job job4 = new Job(conf, "Step 4");
job4.setJarByClass(Step4.class);

job4.setMapperClass(Step4.Step4Mapper.class);
job4.setCombinerClass(Step4.Step4Combiner.class);
job4.setReducerClass(Step4.Step4Reducer.class);

job4.setInputFormatClass(TextInputFormat.class);
job4.setOutputKeyClass(Text.class);
job4.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job4, new Path(outputPath));
FileOutputFormat.setOutputPath(job4, new Path(finalOutputPath));            

System.exit(job4.waitForCompletion(true) ? 0 : 1);

从减速器打印的标准输出中的输出是这样的:

Key:KeyOne Value: ,some value
Key:KeyTwo Value: ,some other value
Key:KeyOne Value: ,some value
Key:KeyTwo Value: ,some other value
Key:KeyOne Value: ,some value
Key:KeyTwo Value: ,some other value

这是没有意义的,因为键是相同的,因此它应该是 2 个减速器,它的 Iterable 中有 3 个相同的值

希望你能帮助我深入了解这个:)

4

2 回答 2

4

这很可能是因为您的组合器同时在 map 和 reduce 阶段运行(一个鲜为人知的“功能”)。

基本上,您正在修改组合器中的键,它可能会运行也可能不会运行,因为映射输出在减速器中合并在一起。在运行组合器(reduce 端)之后,通过分组比较器馈送键以确定 Iterable 传递给 reduce 方法的值(我在这里绕过 reduce 阶段的流方面 - 不支持 iterable通过一组值或值列表,如果分组比较器确定当前键和最后一个键相同,则对 iterator().next() 的更多调用返回 true)

您可以尝试通过检查上下文来检测当前的组合器阶段(映射或减少)(有一种Context.getTaskAttempt().isMap()方法,但我对此也有一些记忆,甚至可能在某处有关于此的 JIRA 票证)。

最重要的是,不要修改组合器中的密钥,除非如果组合器正在运行减少端,您可以找到绕过此行为的方法。

编辑 所以调查@Amar的评论,我整理了一些代码(pastebin链接),其中添加了一些详细的比较器、组合器、reducer等。如果你运行一个映射作业,那么在reduce阶段没有组合器将运行,并且映射输出不会再次排序,因为它已经被假定为已排序。

假设它是排序的,因为它在被发送到组合器类之前已经排序,并且假设键不会被触及 - 因此仍然是排序的。请记住,组合器旨在组合给定键的值。

因此,使用单个映射和给定的组合器,reducer 会看到 KeyOne、KeyTwo、KeyOne、KeyTwo、KeyOne 顺序中的键。分组比较器看到它们之间的转换,因此您可以调用 6 次 reduce 函数

如果您使用两个映射器,那么化简器知道它有两个已排序的段(每个映射中的一个),因此在归约之前仍需要对它们进行排序 - 但由于段的数量低于阈值,因此排序是作为内联流排序(再次假定段已排序)。使用两个映射器(减少阶段的 10 条记录输出),您仍然是错误的输出。

再说一次,不要修改组合器中的密钥,这不是组合器的用途。

于 2013-01-02T21:04:32.747 回答
0

在组合器中试试这个:

context.write(new Text("KeyOne"), new Text("some value"));
context.write(new Text("KeyTwo"), new Text("some other value"));

我看到这种事情发生的唯一方法是,如果key0没有发现来自一个组合器的组合器与来自另一个组合器的组合器相等key0。我不确定在键指向完全相同的实例的情况下它会如何表现(如果您将键设为静态会发生这种情况)。

于 2013-01-02T19:55:45.493 回答