测试 apacheflink
流 API,我通过一个简单的例子发现了一些奇怪的东西。
此代码在 10 秒的窗口下每 5 秒计算一次单词。直到前 10 秒,计数听起来不错,之后,每个打印都显示错误计数 - 每个单词一个。我的代码有问题吗?
def generateWords(ctx: SourceContext[String]) = {
val words = List("amigo", "brazo", "pelo")
while (true) {
Thread.sleep(300)
ctx.collect(words(Random.nextInt(words.length)))
}
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val stream = env.addSource(generateWords _)
val windowedStream = stream.map((_, 1))
.window(Time of(10, SECONDS)).every(Time of(5, SECONDS))
val wordCount = windowedStream
.groupBy("_1")
.sum("_2")
wordCount
.getDiscretizedStream()
.print()
env.execute("sum randoms")
输出是:
[(pelo,3), (brazo,1), (amigo,2)] // first 5 seconds
[(pelo,9), (brazo,5), (amigo,9)] // first 10 seconds
[(brazo,1)]
[(amigo,1)]