我正在尝试使用 BigQuery 加载作业从我的 Apache Beam 管道向 BigQuery 添加行。我正在处理的初始数据来自 Postgresql 数据库,并使用JdbcIO
数据源读入 Beam:
@Override
public PCollection<Intent> expand(PBegin input) {
return input.apply(JdbcIO.<Intent>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
.create(DASHBOARD_DB_DRIVER, getJdbcConnectionString()))
.withQuery(QUERY)
.withCoder(ProtoCoder.of(Intent.class))
.withRowMapper(new IntentParser()));
}
BigQuery 编写器:
@Override
public WriteResult expand(PCollection<KV<StateTable, StateRow>> events) {
Write<KV<StateTable, StateRow>> writer = BigQueryIO.<KV<StateTable, StateRow>>write()
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withFormatFunction(new StateRowMapper())
.to(x -> x.getValue().getKey().getDestination());
return events.apply(writer);
}
当我执行管道时,我得到了数千个以下日志条目(我猜每行一个),这个过程需要很长时间(+10 分钟)。
INFO: Opening TableRowWriter to gs://piesync-analytics-temp-storage/BigQueryWriteTemp/16b8edc635ac4dfcaf2494b0217be8f8/15a00ba8-70ff-43ad-99f7-0090b9465d0b.
Mar 14, 2018 12:13:46 PM org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>
我检查了谷歌云存储中的临时文件(BigQuery 加载作业需要),并注意到每个文件只包含 1 行的数据。
我对此很担心,因为当我使用相同的数据执行完全相同的操作,但从文件而不是数据库中读取时(使用TextIO
),我只会得到几十个日志条目,而临时文件包含数千条 BigQuery 记录。在这种情况下,该过程在不到一分钟的时间内完成
我没有指定任何窗口或触发,因为我只是想读取一次源。
我想我必须启用某种批处理,但我不知道什么和在哪里。