3

Apache Flink 为 DataSet 提供了很多操作。很难理解集群中的数据是如何处理的。例如 WordCount 有不同的实现。有什么区别?

如果有一些文档来解释集群中这些工具的数据流是什么,那将非常有帮助。

    // get input data
    DataSet<String> text = env.fromElements(
            "To be, or not to be,--that is the question:--",
            "Whether 'tis nobler in the mind to suffer",
            "The slings and arrows of outrageous fortune",
            "Or to take arms against a sea of troubles,"
            );
    // WordCount 1
    text.flatMap(new LineSplitter()).groupBy(0).sum(1).print();

    // WordCount 2
    text.flatMap(new LineSplitter()).groupBy(0).aggregate(Aggregations.SUM, 1).print();

    // WordCount 3
    text.flatMap(new LineSplitter()).groupBy(0)
            .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
                    return new Tuple2<String, Integer>(t1.f0, t1.f1+t2.f1);
            }
        }).print();

    // WordCount 4
    text.flatMap(new LineSplitter()).groupBy(0)
            .reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
                @Override
                public void reduce(Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception {
                    int prefixSum = 0;
                    String key = null;
                    for (Tuple2<String, Integer> t : iterable) {
                        prefixSum += t.f1;
                        key = t.f0;
                    }
                    collector.collect(new Tuple2<String, Integer>(key, prefixSum));
            }
        }).print();

    // WordCount 5
    text.flatMap(new LineSplitter())
        .reduceGroup(new GroupReduceFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
            @Override
            public void reduce(Iterable<Tuple2<String, Integer>> iterable, Collector<Tuple2<String, Integer>> collector) throws Exception {
                HashMap<String, Integer> map = new HashMap<String, Integer>();
                for(Tuple2<String, Integer> t : iterable){
                    if(map.containsKey(t.f0)){
                        map.replace(t.f0, map.get(t.f0)+t.f1);
                    } else {
                        map.put(t.f0, t.f1);
                    }
                }
                for(Map.Entry<String, Integer> pair : map.entrySet()){
                    collector.collect(new Tuple2<String, Integer>(pair.getKey(), pair.getValue()));
                }
            }
        }).print();
4

1 回答 1

4

除了 WordCount 5,所有程序的执行都非常类似于常规的 MapReduce WordCount 程序(基于散列的 shuffle 和基于排序的分组)。

  • WordCount 1 是 WordCount 2 的语法糖
  • WordCount 2 在内部使用GroupReduceFunction类似于 WordCount 4 中的 a 执行。唯一的区别是内部GroupReduceFunction实现了Combinable接口以支持部分聚合。
  • WordCount 3 使用ReduceFunction类似于 a 执行的 a GroupReduceFunction。但是,由于接口不同,aReduceFunction总是可以组合的(不需要单独的combine方法)。
  • WordCount 4 的执行与常规 MapReduce 程序一样:使用散列分区和基于排序的分组进行混洗。因为GroupReduceFunction没有实现Combinable接口,所以这个程序在没有本地预聚合的情况下执行,因此效率低于前三个程序。
  • WordCount 5 效率非常低,不应该使用,因为GroupReduceFunction不能并行执行。由于没有groupBy()调用,所有数据都被发送到同一个 Reducer 并作为一个大组处理。首先,这会很慢,因为它是在单线程中执行的,并且受到单台机器的网络吞吐量的限制。其次,如果不同键的数量变得太大,这个程序很容易失败,因为分组是使用 in-memory 完成的HashMap
于 2015-10-25T08:27:13.870 回答