我们有一个 PubSub 主题,其中包含进入 BigQuery 的事件(尽管特定数据库在这里几乎无关紧要)。事件可能带有新的未知属性,这些属性最终应该作为单独的 BigQuery 列结束。
所以,基本上我在这里有两个问题:
- 在 Pipeline 中维护全局状态的正确方法是什么(在我的例子中是一组遇到的属性)?
- 一旦遇到新属性并执行直到
ALTER TABLE
执行,什么是缓冲/保持事件流的好策略
现在我尝试使用以下(我正在使用 Spotify scio):
rows
.withFixedWindows(Duration.millis(duration))
.withWindow[IntervalWindow]
.swap
.groupByKey
.map { case (window, rowsIterable) =>
val newRows = findNewProperties(rowsIterable)
mutateTableWith(newRows)
rowsIterable
}
.flatMap(id)
.saveAsBigQuery()
但这非常低效,因为我们至少需要将整个加载rowsIterable
到内存中,甚至遍历它。