1

我有以下代码来计算来自 socketTextStream 的单词。需要累积字数和时间窗口字数。该程序存在一个问题,即 cumulateCounts 始终与窗口计数相同。为什么会出现这个问题?根据窗口计数计算累积计数的正确方法是什么?

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final HashMap<String, Integer> cumulateCounts = new HashMap<String, Integer>();

final DataStream<Tuple2<String, Integer>> counts = env
            .socketTextStream("localhost", 9999)
            .flatMap(new Splitter())
            .window(Time.of(5, TimeUnit.SECONDS))
            .groupBy(0).sum(1)
            .flatten();

counts.print();

counts.addSink(new SinkFunction<Tuple2<String, Integer>>() {
    @Override
    public void invoke(Tuple2<String, Integer> value) throws Exception {
        String word = value.f0;
        Integer delta_count = value.f1;
        Integer count = cumulateCounts.get(word);
        if (count == null)
            count = 0;
        count = count + delta_count;
        cumulateCounts.put(word, count);
        System.out.println("(" + word + "," + count.toString() + ")");
    }
});
4

1 回答 1

4

您应该首先分组,并将窗口应用于键控数据流(您的代码在 Flink 0.9.1 上工作,但 Flink 0.10.0 中的新 API 对此非常严格):

final DataStream<Tuple2<String, Integer>> counts = env
        .socketTextStream("localhost", 9999)
        .flatMap(new Splitter())
        .groupBy(0)
        .window(Time.of(5, TimeUnit.SECONDS)).sum(1)
        .flatten();

如果在非键控数据流上应用一个窗口,那么在单台机器上将只有一个线程窗口操作符(即没有并行性)来在整个流上构建窗口(在 Flink 0.9.1 中,这个全局窗口可以通过groupBy()-- 但是,在 Flink 0.10.0 中,这将不再起作用)。要计算单词,您需要为每个不同的键值构建一个窗口,即,您首先获取每个键值的子流(通过groupBy())并在每个子流上应用一个窗口运算符(因此,您可以有一个自己的窗口运算符每个子流的实例,允许并行执行)。

对于全局(累积)计数,您可以简单地应用groupBy().sum()构造。首先,流被分成子流(每个键值一个)。其次,您计算流上的总和。因为流不是窗口化的,所以每个传入元组的计算(累积)和更新(更详细地说,总和的初始结果值为零,并且每个元组的结果更新为result += tuple.value)。每次调用 sum 后,都会发出新的当前结果。

在您的代码中,您不应使用特殊的 sink 函数,而应执行以下操作:

counts.groupBy(0).sum(1).print();
于 2015-10-30T23:41:28.740 回答