我需要将一个文件加载到我的数据库中,但在此之前我必须根据一些文件数据验证数据库中是否存在数据。例如,假设我在一个文件中有 5 条记录,那么我必须在数据库中检查 5 次以获取单独的记录。
那么我怎样才能动态地得到这个值呢?我们必须传递动态值而不是 2 (preparedStatement.setString(1, "2");)
在这里,我们正在创建一个 Dataflow 管道,它使用 Apache Beam 将数据加载到数据库中。现在我们创建一个管道对象并创建一个管道。使用 PCollection 我们存储到数据库中。
Pipeline p = Pipeline.create(options);
p.apply("Reading Text", TextIO.read().from(options.getInputFile()))
.apply(ParDo.of(new FilterHeaderFn(csvHeader)))
.apply(ParDo.of(new GetRatePlanID()))
.apply("Format Result", MapElements.into(
TypeDescriptors.strings()).via(
(KV < String, Integer > ABC) - >
ABC.getKey() + "," + ABC.getValue()))
.apply("Write File", TextIO.write()
.to(options.getOutputFile())
.withoutSharding());
// Retrieving data from database
PCollection < String > data =
p.apply(JdbcIO. < String > read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"com.mysql.cj.jdbc.Driver", "jdbc:mysql://localhost:3306/XYZ")
.withUsername("root")
.withPassword("root1234"))
.withQuery("select * from xyz where z = ?")
.withCoder(StringUtf8Coder.of())
.withStatementPreparator(new JdbcIO.StatementPreparator() {
private static final long serialVersionUID = 1 L;
@Override
public void setParameters(PreparedStatement preparedStatement) throws Exception {
preparedStatement.setString(1, "2");
}
})
.withRowMapper(new JdbcIO.RowMapper < String > () {
private static final long serialVersionUID = 1 L;
public String mapRow(ResultSet resultSet) throws Exception {
return "Symbol: " + resultSet.getInt(1) + "\nPrice: " + resultSet.getString(2) +
"\nCompany: " + resultSet.getInt(3);
}
}));