0

我正在使用Spotify Scio创建一个由Pub/Sub消息触发的 scala Dataflow 管道。它从我们的私有中读取DB,然后将信息插入到BigQuery.

问题是:

  • 我需要删除以前的数据
  • 为此,我需要使用 write dispositionWRITE_TRUNCATE
  • 但是,该作业自动注册为流式传输,因此出现以下错误:WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection
  • 所以我需要手动将管道改为Batch管道,指定触发频率。

所以直到现在我有以下管道:

sc
  .customInput("Job Trigger", inputIO)
  .map(handleUserInformationRetrieval(dbOperationTimeout, projectName))
  .flatten
  .withGlobalWindow(options = windowOptions(windowingOutputTriggerDuration))
  .groupBy(_.ssoId)
  .map { case (ssoId, userDataCollection) => Schemas.toTableRow(ssoId, userDataCollection) }
  .filter(_.isSuccess)
  .map(_.get)
  .saveAsBigQuery(tableName, getSchema, WRITE_TRUNCATE, CREATE_NEVER)

scio当我使用api ( saveAsBigQuery)时,我似乎找不到指定触发频率的方法。

它仅存在于本机beamapi 中:

BigQueryIO
  .write()
  .withTriggeringFrequency(Duration.standardDays(1)) // This is what I'm after
  .to(bqTableName)
  .withSchema(getSchema)
  .withCreateDisposition(CREATE_NEVER)
  .withWriteDisposition(WRITE_TRUNCATE)

如果我使用BigQueryIO我将不得不使用sc.pipeline.apply而不是我当前的管道。

有没有办法以某种方式将其集成BigQueryIO到我当前的管道或以某种方式withTriggeringFrequency在当前管道上指定?

4

1 回答 1

0

Scio 目前不支持指定用于将数据加载到 Big Query 的方法。由于这是不可能的,因此自动STREAMING_INSERTS用于无界集合,这显然不支持截断。因此,您需要回退到 BeamBigQueryIO指定触发频率 ( withTriggeringFrequency(...)) 和方法 ( withMethod(Method.FILE_LOADS))。

要将其集成到您的 Scio 管道中,您只需使用saveAsCustomOutput. 也可以在这里找到一个示例:https ://spotify.github.io/scio/io/Type-Safe-BigQuery#using-type-safe-bigquery-directly-with-beams-io-library

于 2019-06-17T12:33:50.740 回答