0

我正在尝试使用 BigQuery 加载作业从我的 Apache Beam 管道向 BigQuery 添加行。我正在处理的初始数据来自 Postgresql 数据库,并使用JdbcIO数据源读入 Beam:

@Override
public PCollection<Intent> expand(PBegin input) {
    return input.apply(JdbcIO.<Intent>read()
                             .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
                                 .create(DASHBOARD_DB_DRIVER, getJdbcConnectionString()))
                             .withQuery(QUERY)
                             .withCoder(ProtoCoder.of(Intent.class))
                             .withRowMapper(new IntentParser()));

}

BigQuery 编写器:

@Override
public WriteResult expand(PCollection<KV<StateTable, StateRow>> events) {
    Write<KV<StateTable, StateRow>> writer = BigQueryIO.<KV<StateTable, StateRow>>write()
                                                       .withCreateDisposition(CreateDisposition.CREATE_NEVER)
                                                       .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                                                       .withFormatFunction(new StateRowMapper())
                                                       .to(x -> x.getValue().getKey().getDestination());

    return events.apply(writer);
}

当我执行管道时,我得到了数千个以下日志条目(我猜每行一个),这个过程需要很长时间(+10 分钟)。

INFO: Opening TableRowWriter to gs://piesync-analytics-temp-storage/BigQueryWriteTemp/16b8edc635ac4dfcaf2494b0217be8f8/15a00ba8-70ff-43ad-99f7-0090b9465d0b.
Mar 14, 2018 12:13:46 PM org.apache.beam.sdk.io.gcp.bigquery.TableRowWriter <init>

我检查了谷歌云存储中的临时文件(BigQuery 加载作业需要),并注意到每个文件只包含 1 行的数据。

我对此很担心,因为当我使用相同的数据执行完全相同的操作,但从文件而不是数据库中读取时(使用TextIO),我只会得到几十个日志条目,而临时文件包含数千条 BigQuery 记录。在这种情况下,该过程在不到一分钟的时间内完成

我没有指定任何窗口或触发,因为我只是想读取一次源。

我想我必须启用某种批处理,但我不知道什么和在哪里。

4

0 回答 0