我正在构建一个管道来将 PubSub 中的数据备份到 GCS 中,并希望使用它创建一个测试JobTest
,我正在努力让 PubSubIO 正确获取事件时间。
PubSub 使用sc.pubsubSubscriptionWithAttributes[String]("path/to/subscription", timestampAttribute = "doc_timestamp")
. 在此之后,我应用窗口并将其发送到CustomIO
测试看起来像这样:
JobTest[PubSub2GCS.type]
.args("--subscription=input", "--targetDir=output")
.input(PubsubIO[(String, Map[String, String])]("input"), Seq(("Contents", Map[String, String]("doc_timestamp" -> "2001-01-01T09:10:11.332Z"))))
.output(CustomIO[KV[String, WindowedDoc]]("output"))(_.debug())
.run()
结果是值被放在了-290308-12-21T20:00:00.000Z..-290308-12-21T21:00:00.000Z
窗口中!!。可能是因为"doc_timestamp"
没有正确解释日期。实际上,无论"doc_timestamp"
键上的值如何,窗口都不会改变。
幸运的是,这项工作在生产中运行时运行良好,但我想编写这个测试。