2

我们正在 Google bigquery 上新实现 DataWareHouse,我们所有的资源都在 prim 数据库上。因此,我们将用于 ETL 和 Maven 的数据流与 Apache Beam SDK 一起使用,以便在 Google Cloud Dataflow 服务上运行 30 个管道。

package com.google.cloud.teleport.templates; 
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.io.DynamicJdbcIO;
import com.google.cloud.teleport.templates.common.JdbcConverters;
import com.google.cloud.teleport.util.KMSEncryptedNestedValueProvider;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;

public class MToBQ {

  private static ValueProvider<String> maybeDecrypt(
      ValueProvider<String> unencryptedValue, ValueProvider<String> kmsKey) {
    return new KMSEncryptedNestedValueProvider(unencryptedValue, kmsKey);
  }
  public static void main(String[] args) {
    JdbcConverters.JdbcToBigQueryOptions options =
        PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(JdbcConverters.JdbcToBigQueryOptions.class);

    run(options);
  }
  private static PipelineResult run(JdbcConverters.JdbcToBigQueryOptions options) {
    Pipeline pipeline = Pipeline.create(options);
    pipeline
       
        .apply(
            "source",
            DynamicJdbcIOMiles.<TableRow>read()
                .withDataSourceConfiguration(
                    DynamicJdbcIOMiles.DynamicDataSourceConfiguration.create(
                            options.getDriverClassName(),
                            maybeDecrypt(options.getConnectionURL(), options.getKMSEncryptionKey()))
                        .withUsername(
                            maybeDecrypt(options.getUsername(), options.getKMSEncryptionKey()))
                        .withPassword(
                            maybeDecrypt(options.getPassword(), options.getKMSEncryptionKey()))
                        .withDriverJars(options.getDriverJars())
                        .withConnectionProperties(options.getConnectionProperties()))
                .withQuery("select * from abcc")
                .withCoder(TableRowJsonCoder.of())
                .withRowMapper(JdbcConverters.getResultSetToTableRow()))
        .apply(
            "Target",
            BigQueryIO.writeTableRows()
                .withoutValidation()
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                .withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory())
                .to("dev-27:staging.STG_ABC"));
 pipeline
        .apply(
            "SOURCE",
            DynamicJdbcIOMiles.<TableRow>read()
                .withDataSourceConfiguration(
                    DynamicJdbcIOMiles.DynamicDataSourceConfiguration.create(
                            options.getDriverClassName(),
                            maybeDecrypt(options.getConnectionURL(), options.getKMSEncryptionKey()))
                        .withUsername(
                            maybeDecrypt(options.getUsername(), options.getKMSEncryptionKey()))
                        .withPassword(
                            maybeDecrypt(options.getPassword(), options.getKMSEncryptionKey()))
                        .withDriverJars(options.getDriverJars())
                        .withConnectionProperties(options.getConnectionProperties()))
                .withQuery("SELECT * FROM XYZ")
                .withCoder(TableRowJsonCoder.of())
                .withRowMapper(JdbcConverters.getResultSetToTableRow()))
 
        .apply(
            "TARGET",
            BigQueryIO.writeTableRows()
                .withoutValidation()
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
                .withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory())
                .to("dev-27:staging.STG_XYZ")); 
    return pipeline.run();
  }
}

如果表中的数据较少,则它运行成功。如果数据以百万为单位,则会引发如下错误

org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: DEADLINE_EXCEEDED: (g)RPC timed out

为了使用参数编译和运行 Java 类的 main 方法,我正在执行以下命令。

mvn compile exec:java \
-Dexec.mainClass=com.google.cloud.teleport.templates.MToBQ \
-Dexec.cleanupDaemonThreads=false \
-Dexec.args=" \
--project= dev-27 \
--region=australia-southeast1 \
--workerMachineType=n1-highmem-8 \
--workerDiskType=compute.googleapis.com/projects/dev-27/zones/australia-southeast1-c/diskTypes/pd-ssd \
--diskSizeGb=50 \
--stagingLocation=gs://dev-dataset/Data/stagingCustomDataFlow/MToBQ \
--tempLocation=gs://dev-dataset/Data/temp \
--templateLocation=gs://dev-dataset/Data/templatesCustomDataFlow/MToBQ/MToBQ.json \
--experiments=upload_graph \
--runner=DataflowRunner" **

请让我知道我是否正确。什么是正确的参数,数据流可以并行执行多个管道吗?

4

0 回答 0