1

我正在构建一个管道来将 PubSub 中的数据备份到 GCS 中,并希望使用它创建一个测试JobTest,我正在努力让 PubSubIO 正确获取事件时间。

PubSub 使用sc.pubsubSubscriptionWithAttributes[String]("path/to/subscription", timestampAttribute = "doc_timestamp"). 在此之后,我应用窗口并将其发送到CustomIO

测试看起来像这样:

JobTest[PubSub2GCS.type]
  .args("--subscription=input", "--targetDir=output")
  .input(PubsubIO[(String, Map[String, String])]("input"), Seq(("Contents", Map[String, String]("doc_timestamp" -> "2001-01-01T09:10:11.332Z"))))
  .output(CustomIO[KV[String, WindowedDoc]]("output"))(_.debug())
  .run()

结果是值被放在了-290308-12-21T20:00:00.000Z..-290308-12-21T21:00:00.000Z窗口中!!。可能是因为"doc_timestamp"没有正确解释日期。实际上,无论"doc_timestamp"键上的值如何,窗口都不会改变。

幸运的是,这项工作在生产中运行时运行良好,但我想编写这个测试。

4

1 回答 1

3

这是因为Map[String, String]中的属性ScioContext#pubsubSubscriptionWithAttributes未填充JobTest

我们可以在这里添加一个条件,并设置时间戳 ifScioContext#isTesttimestampAttribute != null https://github.com/spotify/scio/blob/master/scio-core/src/main/scala/com/spotify/scio/ScioContext.scala#L572

似乎是一个琐事修复。您能在这里提出问题并提交 PR 吗?

于 2018-09-21T09:48:05.580 回答