2

我使用 Scio 编写了一个 Apache Beam 作业,目的是为传入的数据记录生成会话 ID,然后以某种方式丰富它们,然后再将它们输出到 BigQuery。这是代码:

val measurements = sc.customInput("ReadFromPubsub",
  PubsubIO
    .readMessagesWithAttributes()
    .withTimestampAttribute("ts")
    .fromSubscription(subscription)
)

measurements
    .map(extractMeasurement).flatMap {
      case Success(event) =>
        Some(event)
      case Failure(ex) =>
        None
    }
    .timestampBy(_.timestamp)
    .withSessionWindows(sessionGap, WindowOptions(
      trigger = Repeatedly.forever(AfterWatermark.pastEndOfWindow()),
      accumulationMode = AccumulationMode.DISCARDING_FIRED_PANES,
      allowedLateness = Duration.standardDays(1),
      timestampCombiner = TimestampCombiner.END_OF_WINDOW
    ))
    .keyBy(_.clientID)
    .groupByKey
    .toWindowed
    .map(assignSessionID)
    .toSCollection.flatMap(_.results)
    .map(enrich)
    .saveAsTypedBigQuery(output, bigquery.WRITE_APPEND, bigquery.CREATE_NEVER)

我正在使用事件时间戳,它是 中的属性键的值tsPubsubMessage作为我的时间戳属性。.timestampBy这与我在窗口化数据之前使用的时间戳完全相同。我期望的是一旦水印超过 sessionGap(默认 30 分钟),输出触发器就会触发。

使用 Dataflow 运行器和 DirectRunner 时,触发器永远不会触发,即使我模拟时间戳间隔超过 30 分钟的数据。在 Dataflow UI 中,我可以看到水印永远不会根据事件时间戳前进,而是每隔一分钟前进一次,就好像没有收到数据一样。

我已经验证了数据是实际收到的,因为在窗口化之前执行了转换。我还测试了每秒大约 10 条记录,但也许这仍然不足以更新水印?我还设置了一个 JobTest,在其中我得到了预期的输出,同时也向我表明问题是基于时间戳/水印的。

我确定我错过了文档中的重要内容或在某处犯了愚蠢的错误,并希望有人能指出我正确的方向。

4

2 回答 2

1

将消息发布到 pubsub 时,您如何生成写入消息“ts”属性的时间戳,以及如何对它们进行编码?

如果我没记错的话,时间戳必须编码为 RFC3339 规范,例如像这样的“2020-10-02T10:00:00-05:00”

您可以尝试的另一件事是暂时删除“.withTimestampAttribute("ts")”行,以便自动生成使用的时间戳。然后验证您的水印是否正在推进。如果是这样,这表明时间戳值(例如,这些值可能不是您所期望的)或其编码存在问题。

最后,如果使用云数据流运行器,请查看作业状态页面。这应该向您显示数据水印的当前值。您可以检查它是否符合您的期望。

于 2020-05-21T18:23:29.273 回答
0

您可以尝试向 AfterWatermark.pastEndofWindows 添加早期和延迟触发,以查看水印是否已更新,并检查是否有任何延迟到达的数据。您还可以在此处找到有关触发器的文档。

于 2018-06-29T19:49:00.457 回答