0

我需要将一个文件加载到我的数据库中,但在此之前我必须根据一些文件数据验证数据库中是否存在数据。例如,假设我在一个文件中有 5 条记录,那么我必须在数据库中检查 5 次以获取单独的记录。

那么我怎样才能动态地得到这个值呢?我们必须传递动态值而不是 2 (preparedStatement.setString(1, "2");)

在这里,我们正在创建一个 Dataflow 管道,它使用 Apache Beam 将数据加载到数据库中。现在我们创建一个管道对象并创建一个管道。使用 PCollection 我们存储到数据库中。

Pipeline p = Pipeline.create(options);
p.apply("Reading Text", TextIO.read().from(options.getInputFile()))
    .apply(ParDo.of(new FilterHeaderFn(csvHeader)))
    .apply(ParDo.of(new GetRatePlanID()))
    .apply("Format Result", MapElements.into(
        TypeDescriptors.strings()).via(
        (KV < String, Integer > ABC) - >
        ABC.getKey() + "," + ABC.getValue()))
    .apply("Write File", TextIO.write()
        .to(options.getOutputFile())
        .withoutSharding());

// Retrieving data from database
PCollection < String > data =
    p.apply(JdbcIO. < String > read()
        .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
                "com.mysql.cj.jdbc.Driver", "jdbc:mysql://localhost:3306/XYZ")
            .withUsername("root")
            .withPassword("root1234"))

        .withQuery("select * from xyz where z = ?")
        .withCoder(StringUtf8Coder.of())
        .withStatementPreparator(new JdbcIO.StatementPreparator() {
            private static final long serialVersionUID = 1 L;
            @Override
            public void setParameters(PreparedStatement preparedStatement) throws Exception {
                preparedStatement.setString(1, "2");

            }
        })
        .withRowMapper(new JdbcIO.RowMapper < String > () {
            private static final long serialVersionUID = 1 L;
            public String mapRow(ResultSet resultSet) throws Exception {
                return "Symbol: " + resultSet.getInt(1) + "\nPrice: " + resultSet.getString(2) +

                    "\nCompany: " + resultSet.getInt(3);
            }
        }));
4

1 回答 1

0

正如建议的那样,最有效的方法可能是将整个文件加载到临时表中,然后进行查询以更新必要的行。

如果无法做到这一点,您可以改为将表读入 Dataflow(即"select * from xyz"),然后执行 join/CoGroupByKey 以将记录与文件中找到的记录匹配。如果您希望现有数据库与您希望上传到其中的文件相比非常大,您可以使用 DoFn 直接使用 JDBC 对您的数据库进行查询(可能在 DoFn 的 setUp 方法中缓存连接)而不是使用 JdbcIO。

于 2021-03-17T00:56:54.520 回答