10

我在理解围绕事件时间窗口的语义时遇到了一些麻烦。以下程序生成一些带有时间戳的元组,这些时间戳用作事件时间并进行简单的窗口聚合。我希望输出与输入的顺序相同,但输出的顺序不同。为什么输出相对于事件时间无序?

import java.util.concurrent.TimeUnit
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.scala._

object WindowExample extends App {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.getConfig.enableTimestamps()
    env.setParallelism(1)

    val start = 1449597577379L
    val tuples = (1 to 10).map(t => (start + t * 1000, t))

    env.fromCollection(tuples)
      .assignAscendingTimestamps(_._1)
      .timeWindowAll(Time.of(1, TimeUnit.SECONDS))
      .sum(1)
      .print()

    env.execute()
}

输入:

 (1449597578379,1)
 (1449597579379,2)
 (1449597580379,3)
 (1449597581379,4)
 (1449597582379,5)
 (1449597583379,6)
 (1449597584379,7)
 (1449597585379,8)
 (1449597586379,9)
 (1449597587379,10)

结果:

[info] (1449597579379,2)
[info] (1449597581379,4)
[info] (1449597583379,6)
[info] (1449597585379,8)
[info] (1449597587379,10)
[info] (1449597578379,1)
[info] (1449597580379,3)
[info] (1449597582379,5)
[info] (1449597584379,7)
[info] (1449597586379,9)
4

1 回答 1

12

这种行为的原因是在 Flink 中没有考虑元素的顺序(相对于时间戳)。只有水印的正确性及其与元素时间戳的关系对于考虑时间的操作很重要,因为水印通常会触发基于时间的操作中的计算。

在您的示例中,窗口运算符将源中的所有元素存储在内部窗口缓冲区中。然后,源发出一个水印,表示将来不会有具有较小时间戳的元素到达。这反过来告诉窗口操作员处理所有具有低于水印的结束时间戳的窗口(对于所有窗口都是如此)。因此,它发出所有窗口(具有任意顺序),然​​后它自己发出一个水印。其下游的操作本身将接收元素,并且一旦接收到水印就可以进行处理。

默认情况下,从源发出水印的时间间隔为 200 毫秒。对于您的源发出的少量元素,所有这些元素都会在发出第一个水印之前发出。在实际用例中,水印发射间隔远小于窗口大小,您将获得按时间戳顺序发射的窗口的预期行为。例如,如果您每 500 毫秒有 1 小时的窗口和水印。

于 2015-12-09T14:31:12.337 回答