0

我有一个 Apache Beam 任务,它使用 JDBC 从 MySQL 源读取数据,它应该将数据按原样写入 BigQuery 表。此时不执行任何转换,稍后会进行转换,目前我只想将数据库输出直接写入 BigQuery。

这是尝试执行此操作的主要方法:

    public static void main(String[] args) {
        Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

        Pipeline p = Pipeline.create(options);

        // Build the table schema for the output table.
        List<TableFieldSchema> fields = new ArrayList<>();
        fields.add(new TableFieldSchema().setName("phone").setType("STRING"));
        fields.add(new TableFieldSchema().setName("url").setType("STRING"));
        TableSchema schema = new TableSchema().setFields(fields);

        p.apply(JdbcIO.<KV<String, String>>read()
         .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
             "com.mysql.jdbc.Driver", "jdbc:mysql://host:3306/db_name")
             .withUsername("user")
             .withPassword("pass"))
             .withQuery("SELECT phone_number, identity_profile_image FROM scraper_caller_identities LIMIT 100")
             .withRowMapper(new JdbcIO.RowMapper<KV<String, String>>() {
                public KV<String, String> mapRow(ResultSet resultSet) throws Exception {
                return KV.of(resultSet.getString(1), resultSet.getString(2));
             }
          })
         .apply(BigQueryIO.Write
            .to(options.getOutput())
            .withSchema(schema)
            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)));

        p.run();
    }

但是当我使用 maven 执行模板时,出现以下错误:

Test.java:[184,6] 找不到符号符号:方法 apply(com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.Bound)
位置:类 org.apache.beam.sdk.io.jdbc。 JdbcIO.Read<com.google.cloud.dataflow.sdk.values.KV<java.lang.String,java.lang.String>>

看来我没有通过 BigQueryIO.Write 预期的数据收集,这就是我目前正在努力解决的问题。

在这种情况下,如何使来自 MySQL 的数据符合 BigQuery 的期望?

4

1 回答 1

2

我认为您需要向 BigQueryIO.Write 提供 PCollection<TableRow> 而不是 RowMapper 输出的 PCollection<KV<String,String>> 类型。

此外,请在设置 TableRow 时使用正确的列名和值对。注意:我认为您的 KV 是电话和 url 值(例如 {"555-555-1234": " http://www.url.com "}),而不是列名和值对(例如 {"phone ": "555-555-1234", "url": " http://www.url.com "})

请参阅此处的示例: https ://beam.apache.org/documentation/sdks/javadoc/0.5.0/

你能试试这个,让我知道它是否适合你?希望这可以帮助。

于 2017-02-22T01:33:56.267 回答