我有以下代码来计算来自 socketTextStream 的单词。需要累积字数和时间窗口字数。该程序存在一个问题,即 cumulateCounts 始终与窗口计数相同。为什么会出现这个问题?根据窗口计数计算累积计数的正确方法是什么?
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final HashMap<String, Integer> cumulateCounts = new HashMap<String, Integer>();
final DataStream<Tuple2<String, Integer>> counts = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.window(Time.of(5, TimeUnit.SECONDS))
.groupBy(0).sum(1)
.flatten();
counts.print();
counts.addSink(new SinkFunction<Tuple2<String, Integer>>() {
@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {
String word = value.f0;
Integer delta_count = value.f1;
Integer count = cumulateCounts.get(word);
if (count == null)
count = 0;
count = count + delta_count;
cumulateCounts.put(word, count);
System.out.println("(" + word + "," + count.toString() + ")");
}
});