要使用 Scio 执行此操作,您可以创建自定义输出转换,以写入由DynamicDestinations
对象 ( Apache Beam ) 指定的目标。该表由输入元素的某些特征动态确定,在本例中为元素的事件时间(小时)。
BigQuery 的自定义输出转换:
import com.google.api.services.bigquery.model.TableSchema
import com.spotify.scio.bigquery.BigQueryUtil
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition
import org.apache.beam.sdk.io.gcp.bigquery._
import org.apache.beam.sdk.transforms.PTransform
import org.apache.beam.sdk.values.{PCollection, PDone, ValueInSingleWindow}
def saveAsBigQuery(tblPrefix: String,
tblSchema: String,
writeDisposition: WriteDisposition):
PTransform[PCollection[TableRow], PDone] = {
BigQueryIO.writeTableRows()
.to(new DynamicDestinations[TableRow, String] {
override def getTable(tblSuffix: String): TableDestination = {
// TODO: construct table name
val tblName = "%s_%s".format(tblPrefix, tblSuffix)
new TableDestination(tblName, null)
}
override def getDestination(tblRow: ValueInSingleWindow[TableRow]): String = {
// TODO: determine hourly table suffix based on event time in tblRow object
}
override def getSchema(destination: String): TableSchema = {
BigQueryUtil.parseSchema(tblSchema)
}
})
.withWriteDisposition(writeDisposition)
.asInstanceOf[PTransform[PCollection[TableRow], PDone]]
}
使用上面的函数应用自定义输出转换:
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write
val tblPrefix = "table_prefix"
val tblSchema = "table_schema" // TODO: this needs to be in valid BigQuery schema format
val writeDisposition = Write.WriteDisposition.WRITE_APPEND
val bqTransform = saveAsBigQuery(
tblPrefix,
tblSchema,
writeDisposition)
// assuming tblRows is an SCollection[TableRow]
tblRows.saveAsCustomOutput("saveAsBigQuery", bqTransform)