0

每天将所有记录从 BigQuery 表传输到 Cloud SQL 表的最佳方法是什么(每天预期的大约记录数超过 255801312 [2.55 亿])。我知道我们可以创建从 BQ 到 CloudSQL 的数据流管道,但是如此大量的数据将运行数小时和数小时。在谷歌云中实施的任何最佳解决方案?

4

1 回答 1

0

这是工作流的一个工作示例。您需要为您的工作流服务帐户(cloudsql 管理员、bigquery dataviewer + 作业用户、云存储管理员)提供足够的权限,并且该表必须存在于您的 Cloud SQL 实例中(我使用 MySQL 进行了测试)。

这篇文章正在烹饪,其中包含更多细节。替换存储桶、projectid、Cloud SQL 实例名称(在我的例子中是 mysql)、查询、表名、数据库架构

main:
  steps:
    - assignStep:
        assign:
          - bucket: "TODO"
          - projectid: "TODO"
          - prefix: "workflow-import/export"
          - listResult:
              nextPageToken: ""
    - export-query:
        call: googleapis.bigquery.v2.jobs.query
        args:
          projectId: ${projectid}
          body:
            query: ${"EXPORT DATA OPTIONS( uri='gs://" + bucket + "/" + prefix + "*.csv', format='CSV', overwrite=true,header=false) AS SELECT id, email FROM `copy_dataset.name_test`"}
            useLegacySql: false
    - importfiles:
        call: import_files
        args:
          pagetoken: ${listResult.nextPageToken}
          bucket: ${bucket}
          prefix: ${prefix}
          projectid: ${projectid}
        result: listResult
    - missing-files:
        switch:
          - condition:  ${"nextPageToken" in listResult}
            next: importfiles


import_files:
  params:
    - pagetoken
    - bucket
    - prefix
    - projectid
  steps:
    - list-files:
        call: googleapis.storage.v1.objects.list
        args:
          bucket: ${bucket}
          pageToken: ${pagetoken}
          prefix: ${prefix}
        result: listResult
    - process-files:
        for:
          value: file
          in: ${listResult.items}
          steps:
            - wait-import:
                call: load_file
                args:
                  projectid: ${projectid}
                  importrequest:
                    importContext:
                      uri: ${"gs://" + bucket + "/" + file.name}
                      database: "test_schema"
                      fileType: CSV
                      csvImportOptions:
                        table: "workflowimport"
    - return-step:
        return: ${listResult}


load_file:
  params: [importrequest,projectid]
  steps:
    - callImport:
        call: http.post
        args:
          url: ${"https://sqladmin.googleapis.com/v1/projects/" + projectid + "/instances/mysql/import"}
          auth:
            type: OAuth2
          body: ${importrequest}
        result: operation
    - chekoperation:
        switch:
          - condition: ${operation.body.status != "DONE"}
            next: wait
        next: completed
    - completed:
        return: "done"
    - wait:
        call: sys.sleep
        args:
          seconds: 5
        next: getoperation
    - getoperation:
        call: http.get
        args:
          url: ${operation.body.selfLink}
          auth:
            type: OAuth2
        result: operation
        next: chekoperation

更多细节在我的中篇文章中

于 2021-11-12T21:55:29.273 回答