1

下面的程序是连接到 Oracle 11g 并获取记录。它如何给我在 pipeline.apply() 的编码器提供 NullPointerException。

我已将 ojdbc14.jar 添加到项目依赖项中。

public static void main(String[] args) {

        Pipeline p = Pipeline.create(PipelineOptionsFactory.create());      
         p.apply(JdbcIO.<KV<Integer, String>>read()
                   .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
                          "oracle.jdbc.driver.OracleDriver", "jdbc:oracle:thin:@hostdnsname:port/servicename")
                   .withUsername("uname")
                   .withPassword("pwd"))
                   .withQuery("select EMPID,NAME from EMPLOYEE1")
                   .withRowMapper(new JdbcIO.RowMapper<KV<Integer, String>>() {
                     public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
                       return KV.of(resultSet.getInt(1), resultSet.getString(2));
                     }
                   }));
         p.run();

    }

给出以下错误。任何线索?

Exception in thread "main" java.lang.NullPointerException: coder
    at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:228)
    at org.apache.beam.sdk.io.jdbc.JdbcIO$Read.validate(JdbcIO.java:283)
    at org.apache.beam.sdk.io.jdbc.JdbcIO$Read.validate(JdbcIO.java:216)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:399)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:307)
    at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:47)
    at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:158)
    at org.apache.beam.examples.v030.JdbcUtil.main(JdbcUtil.java:21)
4

2 回答 2

3

你好呀!

抱歉,错误消息不是很有帮助,但实际上它是一个验证步骤。我已经提交了BEAM-959来改进这一点。

您需要提供编码器,例如 via

.withCoder(KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of())`

我已经提交了BEAM-960以提高此编码器的自动化程度,就像我们在 Beam 中的大多数其他地方一样。

于 2016-11-10T21:04:06.730 回答
-1

尝试这个。

 pipeline.apply(( JdbcIO.<KV<Integer, String>>read().withCoder(KvCoder.of(VarIntCoder.of(),StringUtf8Coder.of())) 
               .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
                      "com.mysql.jdbc.Driver", "jdbc:mysql://localhost:3306/deepakgoyal")
                    .withUsername("root")
                    .withPassword("root"))
               .withQuery("select empid, name from employee")

               .withRowMapper(new JdbcIO.RowMapper<KV<Integer, String>>() {
                 public KV<Integer, String> mapRow(ResultSet resultSet) throws Exception {
                   return KV.of(resultSet.getInt(1), resultSet.getString(2));
                 }
               })
             ))

并且不要忘记在您的项目中添加 MySQL 连接器 jar。

于 2017-01-18T13:51:28.503 回答