0

要求是从 BQ 读取最新更新的记录并加载到 CloudSQL:

这是执行的步骤,

  1. 读取大于 LAST_UPD_TS 的 BQ 表记录。PCollection read_from_bq = pipeline.apply("从 bq 读取", BigQueryIO.readTableRows() .withTemplateCompatibility() .fromQuery("select * from  pdata.DEPTWHERE LAST_UPD_TS >='parametervalue'";) .withoutValidation().usingStandardSql());

  2. 记录将插入 CloudSQL read_from_bq.apply("Insert and Update", JdbcIO.write() .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(ValueProvider.StaticValueProvider.of("com.mysql.jdbc.Driver"), jdbcUrlValueProvider )) .withStatement("插入 DEPT (LOC_DIM_ID,DIVN_NBR,DEPT_NBR,END_DT,START_DT,PRC_OPT_CD,PRN_LVL_CD,PRICE_LOC_NBR,LAST_UPD_TS,LAST_UPD_USERID)" + "值(?,?,?,?,?,?,?,? ,?,?)" + "在重复密钥更新 START_DT=?,PRC_OPT_CD=?,PRN_LVL_CD=?,PRICE_LOC_NBR=?,LAST_UPD_TS=?,LAST_UPD_USERID=?") .withPreparedStatementSetter(new DEPT_BULKPreparedStatementSetters()) ); PipelineResult.State state = pipeline.run().waitUntilFinish();

上面的代码完美执行,没有任何问题

要求是,首先需要从 CloudSQL 表现有记录中读取 LAST_UPD_TS 列,并将其设置为管道中 BigQuery 查询的参数值,在使用 System.out.println(tableRowTypedRead.getTable( ).get("loc_dim")); 在下面的代码中,我们如何将从 CloudSQL 获取的 LAST_UPD_TS 值作为管道中的第一步作为内联参数传递给同一管道中的 BQ 查询?

       PCollection<String> last_upd_td = pipeline.apply("get latest ts", JdbcIO.<String>read()
                        .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(ValueProvider.StaticValueProvider.of("com.mysql.jdbc.Driver"), jdbcUrlValueProvider))
                        .withQuery("select MAX(last_upd_ts) AS last_upd_ts from DEPT")
                        .withCoder(AvroCoder.of(String.class))
                        .withRowMapper((JdbcIO.RowMapper<String>) resultSet -> {
                            return resultSet.getString("last_upd_ts");
                        })
        );
        
        PCollection<BigQueryIO.TypedRead<TableRow>> bq = last_upd_td.apply(ParDo.of(new DoFn<String, BigQueryIO.TypedRead<TableRow>>() {
            @ProcessElement
            public void processElement(ProcessContext c, @Element String last_upd_td ) throws Exception {
                BQ_TO_CSQL_Options as = c.getPipelineOptions().as(BQ_TO_CSQL_Options.class);
                BigQueryIO.TypedRead<TableRow> tableRowTypedRead = BigQueryIO.readTableRows()
                        .withTemplateCompatibility()
                        .fromQuery("select * from `pdata.DEPT` WHERE LAST_UPD_TS >= " + last_upd_td))
                        .withoutValidation().usingStandardSql();
                        System.out.println(tableRowTypedRead.getTable().get("loc_dim"));
                c.output(tableRowTypedRead);
            }
        }));

        bq.apply("Insert and Update", JdbcIO.<BigQueryIO.TypedRead<TableRow>>write()
                .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(ValueProvider.StaticValueProvider.of("com.mysql.jdbc.Driver"), jdbcUrlValueProvider))
                .withStatement("insert into DEPT (LOC_DIM_ID,DIVN_NBR,DEPT_NBR,END_DT,START_DT,PRC_OPT_CD,PRN_LVL_CD,PRICE_LOC_NBR,LAST_UPD_TS,LAST_UPD_USERID)" +
                        "values( ?,?,?,?,?,?,?,?,?,?)" +
                        "ON DUPLICATE KEY UPDATE START_DT=?,PRC_OPT_CD=?,PRN_LVL_CD=?,PRICE_LOC_NBR=?,LAST_UPD_TS=?,LAST_UPD_USERID=?")
                .withPreparedStatementSetter(new DEPT_BULKPreparedStatementSetters())
        );

        PipelineResult.State state = pipeline.run().waitUntilFinish();
4

0 回答 0