我使用 Scio 编写了一个 Apache Beam 作业,目的是为传入的数据记录生成会话 ID,然后以某种方式丰富它们,然后再将它们输出到 BigQuery。这是代码:
val measurements = sc.customInput("ReadFromPubsub",
PubsubIO
.readMessagesWithAttributes()
.withTimestampAttribute("ts")
.fromSubscription(subscription)
)
measurements
.map(extractMeasurement).flatMap {
case Success(event) =>
Some(event)
case Failure(ex) =>
None
}
.timestampBy(_.timestamp)
.withSessionWindows(sessionGap, WindowOptions(
trigger = Repeatedly.forever(AfterWatermark.pastEndOfWindow()),
accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES,
allowedLateness = Duration.standardDays(1),
timestampCombiner = TimestampCombiner.END_OF_WINDOW
))
.keyBy(_.clientID)
.groupByKey
.toWindowed
.map(assignSessionID)
.toSCollection.flatMap(_.results)
.map(enrich)
.saveAsTypedBigQuery(output, bigquery.WRITE_APPEND, bigquery.CREATE_NEVER)
我正在使用事件时间戳,它是 中的属性键的值ts
,PubsubMessage
作为我的时间戳属性。.timestampBy
这与我在窗口化数据之前使用的时间戳完全相同。我期望的是一旦水印超过 sessionGap(默认 30 分钟),输出触发器就会触发。
使用 Dataflow 运行器和 DirectRunner 时,触发器永远不会触发,即使我模拟时间戳间隔超过 30 分钟的数据。在 Dataflow UI 中,我可以看到水印永远不会根据事件时间戳前进,而是每隔一分钟前进一次,就好像没有收到数据一样。
我已经验证了数据是实际收到的,因为在窗口化之前执行了转换。我还测试了每秒大约 10 条记录,但也许这仍然不足以更新水印?我还设置了一个 JobTest,在其中我得到了预期的输出,同时也向我表明问题是基于时间戳/水印的。
我确定我错过了文档中的重要内容或在某处犯了愚蠢的错误,并希望有人能指出我正确的方向。