0

我需要SCollection根据时间戳将元素保存到不同的每小时 BigQuery 表中。我尝试了以下方法 -

  1. 对元素进行分组(TableName, Iterable[TableRow]),然后使用实例将每个元素保存Iterable[TableRow]到各自的表中。BigQueryClient这不起作用,因为BigQueryClient不可序列化。

  2. 创建一个SCollection[TableName, PCollection[TableRow]],然后PCollection[TableRow]使用BigQueryIO.Write. 要创建PCollection[TableRow]我使用的对象.map(s => (s._1, sc.pipeline.apply(Create.of(s._2.toList.asJava))))scScioContext 的实例在哪里。这不起作用,因为ScioContext不可序列化。

有没有办法将插入元素流式传输到不同的 BigQuery 表中?

4

2 回答 2

2

要使用 Scio 执行此操作,您可以创建自定义输出转换,以写入由DynamicDestinations对象 ( Apache Beam ) 指定的目标。该表由输入元素的某些特征动态确定,在本例中为元素的事件时间(小时)。

BigQuery 的自定义输出转换:

import com.google.api.services.bigquery.model.TableSchema
import com.spotify.scio.bigquery.BigQueryUtil
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition
import org.apache.beam.sdk.io.gcp.bigquery._
import org.apache.beam.sdk.transforms.PTransform
import org.apache.beam.sdk.values.{PCollection, PDone, ValueInSingleWindow}


def saveAsBigQuery(tblPrefix: String,
                   tblSchema: String,
                   writeDisposition: WriteDisposition):
  PTransform[PCollection[TableRow], PDone] = {

  BigQueryIO.writeTableRows()
    .to(new DynamicDestinations[TableRow, String] {

      override def getTable(tblSuffix: String): TableDestination = {
        // TODO: construct table name
        val tblName = "%s_%s".format(tblPrefix, tblSuffix)
        new TableDestination(tblName, null)
      }

      override def getDestination(tblRow: ValueInSingleWindow[TableRow]): String = {
        // TODO: determine hourly table suffix based on event time in tblRow object
      }

      override def getSchema(destination: String): TableSchema = {
        BigQueryUtil.parseSchema(tblSchema)
      }
    })
    .withWriteDisposition(writeDisposition)
    .asInstanceOf[PTransform[PCollection[TableRow], PDone]]
}

使用上面的函数应用自定义输出转换:

import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write


val tblPrefix = "table_prefix"
val tblSchema = "table_schema"  // TODO: this needs to be in valid BigQuery schema format
val writeDisposition = Write.WriteDisposition.WRITE_APPEND

val bqTransform = saveAsBigQuery(
  tblPrefix,
  tblSchema,
  writeDisposition)

// assuming tblRows is an SCollection[TableRow]
tblRows.saveAsCustomOutput("saveAsBigQuery", bqTransform)
于 2017-12-12T08:29:04.473 回答
0

在 Beam 中,BigQuery IO 转换提供了几种基于当前窗口选择表的方法。我相信 Dataflow 1.9 对依赖于窗口的目的地有类似的方法。

Dataflow 2.0 还包括DynamicDestinations。有关根据每个元素中的用户 ID 选择表的示例,请参阅 Javadoc。

我对 Scio 不熟悉,但似乎从 BigQuery IO 公开底层方法将是完成此任务的最简单方法。

于 2017-06-19T17:30:17.847 回答