1

测试 apacheflink流 API,我通过一个简单的例子发现了一些奇怪的东西。

此代码在 10 秒的窗口下每 ​​5 秒计算一次单词。直到前 10 秒,计数听起来不错,之后,每个打印都显示错误计数 - 每个单词一个。我的代码有问题吗?

def generateWords(ctx: SourceContext[String]) = {
  val words = List("amigo", "brazo", "pelo")
  while (true) {
    Thread.sleep(300)
    ctx.collect(words(Random.nextInt(words.length)))
  }
}

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setParallelism(1)

val stream = env.addSource(generateWords _)
val windowedStream = stream.map((_, 1))
  .window(Time of(10, SECONDS)).every(Time of(5, SECONDS))


val wordCount = windowedStream
  .groupBy("_1")
  .sum("_2")

wordCount
 .getDiscretizedStream()
  .print()

env.execute("sum randoms")

输出是:

[(pelo,3), (brazo,1), (amigo,2)] // first 5 seconds
[(pelo,9), (brazo,5), (amigo,9)] // first 10 seconds
[(brazo,1)] 
[(amigo,1)]
4

0 回答 0