0

当我从本地机器运行我的管道时,我可以更新驻留在云 Sql 实例中的表。但是,当我移动它以使用 DataflowRunner 运行时,同样会失败,并出现以下异常。

为了从我的 Eclipse 连接,我将数据源配置创建为 .create("com.mysql.jdbc.Driver", "jdbc:mysql://<ip of sql instance > :3306/mydb").

.create("com.mysql.jdbc.GoogleDriver", "jdbc:google:mysql://<project-id>:<instance-name>/my-db")我在运行 Dataflow 运行器时更改为相同 。

  1. 我应该将实例的区域信息添加到前缀吗?

我运行它时得到的异常如下:

Jun 22, 2017 6:53:58 PM org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2017-06-22T13:23:51.583Z: (840be37ab35d3d0d): Starting 2 workers in us-central1-f...
Jun 22, 2017 6:53:58 PM org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2017-06-22T13:23:51.634Z: (dabfae1dc9365d10): Executing operation JdbcIO.Read/Create.Values/Read(CreateSource)+JdbcIO.Read/ParDo(Read)+JdbcIO.Read/ParDo(Anonymous)+JdbcIO.Read/GroupByKey/Reify+JdbcIO.Read/GroupByKey/Write
Jun 22, 2017 6:54:49 PM org.apache.beam.runners.dataflow.util.MonitoringUtil$LoggingHandler process
INFO: 2017-06-22T13:24:44.762Z: (21395b94f8bf7f61): Workers have started successfully.

SEVERE: 2017-06-22T13:25:30.214Z: (3b988386f963503e): java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.sql.SQLException: Cannot load JDBC driver class 'com.mysql.jdbc.GoogleDriver'
    at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:289)
    at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:261)
    at com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:55)
    at com.google.cloud.dataflow.worker.graph.Networks$TypeSafeNodeFunction.apply(Networks.java:43)
    at com.google.cloud.dataflow.worker.graph.Networks.replaceDirectedNetworkNodes(Networks.java:78)
    at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory.create(MapTaskExecutorFactory.java:152)
    at com.google.cloud.dataflow.worker.runners.worker.DataflowWorker.doWork(DataflowWorker.java:272)
    at com.google.cloud.dataflow.worker.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:244)
    at com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:125)
    at com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:105)
    at com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:92)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.beam.sdk.util.UserCodeException: java.sql.SQLException: Cannot load JDBC driver class 'com.mysql.jdbc.GoogleDriver'
    at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:36)
    at org.apache.beam.sdk.io.jdbc.JdbcIO$Read$ReadFn$auxiliary$M7MKjX9p.invokeSetup(Unknown Source)
    at com.google.cloud.dataflow.worker.runners.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.deserializeCopy(DoFnInstanceManagers.java:65)
    at com.google.cloud.dataflow.worker.runners.worker.DoFnInstanceManagers$ConcurrentQueueInstanceManager.peek(DoFnInstanceManagers.java:47)
    at com.google.cloud.dataflow.worker.runners.worker.UserParDoFnFactory.create(UserParDoFnFactory.java:100)
    at com.google.cloud.dataflow.worker.runners.worker.DefaultParDoFnFactory.create(DefaultParDoFnFactory.java:70)
    at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory.createParDoOperation(MapTaskExecutorFactory.java:365)
    at com.google.cloud.dataflow.worker.runners.worker.MapTaskExecutorFactory$3.typedApply(MapTaskExecutorFactory.java:278)
    ... 14 more

非常感谢任何解决此问题的帮助。这是我第一次尝试将梁管道作为数据流作业运行。

PipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

    ((DataflowPipelineOptions) options).setNumWorkers(2);
    ((DataflowPipelineOptions)options).setProject("xxxxx");
    ((DataflowPipelineOptions)options).setStagingLocation("gs://xxxx/staging");
    ((DataflowPipelineOptions)options).setRunner(DataflowRunner.class);
    ((DataflowPipelineOptions)options).setStreaming(false);
    options.setTempLocation("gs://xxxx/tempbucket");
    options.setJobName("sqlpipeline");
PCollection<Account> collection = dataflowPipeline.apply(JdbcIO.<Account>read()
            .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
                    .create("com.mysql.jdbc.GoogleDriver", "jdbc:google:mysql://project-id:testdb/db")
                    .withUsername("root").withPassword("root"))
            .withQuery(
                    "select account_id,account_parent,account_description,account_type,account_rollup,Custom_Members from account")
            .withCoder(AvroCoder.of(Account.class)).withStatementPreparator(new JdbcIO.StatementPreparator() {
                public void setParameters(PreparedStatement preparedStatement) throws Exception {
                    preparedStatement.setFetchSize(1);
                    preparedStatement.setFetchDirection(ResultSet.FETCH_FORWARD);

                }
            }).withRowMapper(new JdbcIO.RowMapper<Account>() {
                public Account mapRow(ResultSet resultSet) throws Exception {
                    Account account = new Account();
                    account.setAccount_id(resultSet.getInt("account_id"));
                    account.setAccount_parent(resultSet.getInt("account_parent"));
                    account.setAccount_description(resultSet.getString("account_description"));
                    account.setAccount_type(resultSet.getString("account_type"));
                    account.setAccount_rollup("account_rollup");
                    account.setCustom_Members("Custom_Members");
                    return account;
                }
            }));
4

2 回答 2

1

您是否正确引入了 com.google.cloud.sql/mysql-socket-factory maven 依赖项?看起来您无法加载课程。

https://cloud.google.com/appengine/docs/standard/java/cloud-sql/#Java_Connect_to_your_database

于 2017-06-27T22:27:56.123 回答
0

嗨,我认为最好继续使用“com.mysql.jdbc.Driver”,因为谷歌驱动程序支持应用程序引擎部署

因此,这就是我的管道配置看起来很相似,对我来说效果很好

PCollection < KV <  Double, Double >> exchangeRates = p.apply(JdbcIO. < KV <  Double, Double >> read()
     .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("com.mysql.jdbc.Driver", "jdbc:mysql://ip:3306/dbname?user=root&password=root&useUnicode=true&characterEncoding=UTF-8")
             )
     .withQuery(
      "SELECT PERIOD_YEAR, PERIOD_YEAR FROM SALE")
     .withCoder(KvCoder.of(DoubleCoder.of(), DoubleCoder.of()))
     .withRowMapper(new JdbcIO.RowMapper < KV < Double, Double >> () {
      @Override
       public KV<Double, Double> mapRow(java.sql.ResultSet resultSet) throws Exception {
         LOG.info(resultSet.getDouble(1)+ "Came");
          return KV.of(resultSet.getDouble(1), resultSet.getDouble(2));
      }
}));

希望它会有所帮助

于 2017-08-02T11:21:23.037 回答