3

我们有一个 PubSub 主题,其中包含进入 BigQuery 的事件(尽管特定数据库在这里几乎无关紧要)。事件可能带有新的未知属性,这些属性最终应该作为单独的 BigQuery 列结束。

所以,基本上我在这里有两个问题:

  1. 在 Pipeline 中维护全局状态的正确方法是什么(在我的例子中是一组遇到的属性)?
  2. 一旦遇到新属性并执行直到ALTER TABLE执行,什么是缓冲/保持事件流的好策略

现在我尝试使用以下(我正在使用 Spotify scio):

rows
  .withFixedWindows(Duration.millis(duration))
  .withWindow[IntervalWindow]
  .swap
  .groupByKey
  .map { case (window, rowsIterable) =>
    val newRows = findNewProperties(rowsIterable)
    mutateTableWith(newRows)
    rowsIterable
  }
  .flatMap(id)
  .saveAsBigQuery()

但这非常低效,因为我们至少需要将整个加载rowsIterable到内存中,甚至遍历它。

4

1 回答 1

4

我们正在构建相同的项目,并且我们正在使用包含模式的令人耳目一新的侧面输入遵循这种方法(从 BQ 间隔刷新)。所以基本上:

  1. 在侧面输入从 BQ 加载模式
  2. 使用流模式将数据流式传输到 BQ,这样您就可以对无法插入的行执行其他操作(即:当它们具有新的未知属性时)
  3. 将那些失败的那些保存在其他地方(数据存储?)以便以后处理它们(例如,在另一个工作中)
  4. 该恢复作业将发出架构更改,最终将由主管道刷新侧输入(步骤 1)加载。

我有一个使用令人耳目一新的侧面输入方法的工作示例here

于 2018-06-01T08:22:59.363 回答