要求是从 BQ 读取最新更新的记录并加载到 CloudSQL:
这是执行的步骤,
读取大于 LAST_UPD_TS 的 BQ 表记录。PCollection read_from_bq = pipeline.apply("从 bq 读取", BigQueryIO.readTableRows() .withTemplateCompatibility() .fromQuery("select * from
pdata.DEPT
WHERE LAST_UPD_TS >='parametervalue'";) .withoutValidation().usingStandardSql());记录将插入 CloudSQL read_from_bq.apply("Insert and Update", JdbcIO.write() .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(ValueProvider.StaticValueProvider.of("com.mysql.jdbc.Driver"), jdbcUrlValueProvider )) .withStatement("插入 DEPT (LOC_DIM_ID,DIVN_NBR,DEPT_NBR,END_DT,START_DT,PRC_OPT_CD,PRN_LVL_CD,PRICE_LOC_NBR,LAST_UPD_TS,LAST_UPD_USERID)" + "值(?,?,?,?,?,?,?,? ,?,?)" + "在重复密钥更新 START_DT=?,PRC_OPT_CD=?,PRN_LVL_CD=?,PRICE_LOC_NBR=?,LAST_UPD_TS=?,LAST_UPD_USERID=?") .withPreparedStatementSetter(new DEPT_BULKPreparedStatementSetters()) ); PipelineResult.State state = pipeline.run().waitUntilFinish();
上面的代码完美执行,没有任何问题
要求是,首先需要从 CloudSQL 表现有记录中读取 LAST_UPD_TS 列,并将其设置为管道中 BigQuery 查询的参数值,在使用 System.out.println(tableRowTypedRead.getTable( ).get("loc_dim")); 在下面的代码中,我们如何将从 CloudSQL 获取的 LAST_UPD_TS 值作为管道中的第一步作为内联参数传递给同一管道中的 BQ 查询?
PCollection<String> last_upd_td = pipeline.apply("get latest ts", JdbcIO.<String>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(ValueProvider.StaticValueProvider.of("com.mysql.jdbc.Driver"), jdbcUrlValueProvider))
.withQuery("select MAX(last_upd_ts) AS last_upd_ts from DEPT")
.withCoder(AvroCoder.of(String.class))
.withRowMapper((JdbcIO.RowMapper<String>) resultSet -> {
return resultSet.getString("last_upd_ts");
})
);
PCollection<BigQueryIO.TypedRead<TableRow>> bq = last_upd_td.apply(ParDo.of(new DoFn<String, BigQueryIO.TypedRead<TableRow>>() {
@ProcessElement
public void processElement(ProcessContext c, @Element String last_upd_td ) throws Exception {
BQ_TO_CSQL_Options as = c.getPipelineOptions().as(BQ_TO_CSQL_Options.class);
BigQueryIO.TypedRead<TableRow> tableRowTypedRead = BigQueryIO.readTableRows()
.withTemplateCompatibility()
.fromQuery("select * from `pdata.DEPT` WHERE LAST_UPD_TS >= " + last_upd_td))
.withoutValidation().usingStandardSql();
System.out.println(tableRowTypedRead.getTable().get("loc_dim"));
c.output(tableRowTypedRead);
}
}));
bq.apply("Insert and Update", JdbcIO.<BigQueryIO.TypedRead<TableRow>>write()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(ValueProvider.StaticValueProvider.of("com.mysql.jdbc.Driver"), jdbcUrlValueProvider))
.withStatement("insert into DEPT (LOC_DIM_ID,DIVN_NBR,DEPT_NBR,END_DT,START_DT,PRC_OPT_CD,PRN_LVL_CD,PRICE_LOC_NBR,LAST_UPD_TS,LAST_UPD_USERID)" +
"values( ?,?,?,?,?,?,?,?,?,?)" +
"ON DUPLICATE KEY UPDATE START_DT=?,PRC_OPT_CD=?,PRN_LVL_CD=?,PRICE_LOC_NBR=?,LAST_UPD_TS=?,LAST_UPD_USERID=?")
.withPreparedStatementSetter(new DEPT_BULKPreparedStatementSetters())
);
PipelineResult.State state = pipeline.run().waitUntilFinish();