我正在使用Spotify Scio
创建一个由Pub/Sub
消息触发的 scala Dataflow 管道。它从我们的私有中读取DB
,然后将信息插入到BigQuery
.
问题是:
- 我需要删除以前的数据
- 为此,我需要使用 write disposition
WRITE_TRUNCATE
- 但是,该作业自动注册为流式传输,因此出现以下错误:
WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection
- 所以我需要手动将管道改为
Batch
管道,指定触发频率。
所以直到现在我有以下管道:
sc
.customInput("Job Trigger", inputIO)
.map(handleUserInformationRetrieval(dbOperationTimeout, projectName))
.flatten
.withGlobalWindow(options = windowOptions(windowingOutputTriggerDuration))
.groupBy(_.ssoId)
.map { case (ssoId, userDataCollection) => Schemas.toTableRow(ssoId, userDataCollection) }
.filter(_.isSuccess)
.map(_.get)
.saveAsBigQuery(tableName, getSchema, WRITE_TRUNCATE, CREATE_NEVER)
scio
当我使用api ( saveAsBigQuery
)时,我似乎找不到指定触发频率的方法。
它仅存在于本机beam
api 中:
BigQueryIO
.write()
.withTriggeringFrequency(Duration.standardDays(1)) // This is what I'm after
.to(bqTableName)
.withSchema(getSchema)
.withCreateDisposition(CREATE_NEVER)
.withWriteDisposition(WRITE_TRUNCATE)
如果我使用BigQueryIO
我将不得不使用sc.pipeline.apply
而不是我当前的管道。
有没有办法以某种方式将其集成BigQueryIO
到我当前的管道或以某种方式withTriggeringFrequency
在当前管道上指定?