0

我需要帮助/建议来实现以下用例。

我在 Google Cloud Storage 存储桶中有 2 个 csv 文件,我需要基于一个公共列加入这 2 个文件,并且我需要将输出文件保存回 Google Cloud Storage 存储桶。

我需要使用任何谷歌云解决方案(使用梁 python 的云数据流)、云函数或任何其他云解决方案来实现这一点,因为我是谷歌云平台的新手,我请求大家帮助我实现这个用例。

期待您的回音

提前致谢

4

1 回答 1

1

您有几种方法可以实现这一目标。如果合并的结果小于 1Gb,并且您只需要 1 个输出文件,您可以这样做

  • 从 BigQuery(联合查询)查询外部 CSV 文件并将结果保存在临时表中,如下所示
CREATE OR REPLACE EXTERNAL TABLE mydataset.table1
OPTIONS (
  format = 'CSV',
  uris = ['gs://mybucket/file1.csv'],
  skip_leading_rows = 1
)

CREATE OR REPLACE EXTERNAL TABLE mydataset.table2
OPTIONS (
  format = 'CSV',
  uris = ['gs://mybucket/file2.csv'],
  skip_leading_rows = 1
)

CREATE TABLE mydataset.newtable
OPTIONS(
  expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)
) AS
SELECT * 
FROM mydataset.table1 join mydataset.table2 ON ....

否则,您可以使用我在本文中描述的解决方案(我写的)


编辑 1

您可以使用此工作流定义示例来满足您的需要。


- loadFile1:
    call: http.post
    args:
      url: https://bigquery.googleapis.com/bigquery/v2/projects/<projectID>/jobs
      auth:
        type: OAuth2
      body:
        configuration:
          query:
            query: CREATE OR REPLACE EXTERNAL TABLE mydataset.table1 OPTIONS (format = 'CSV', uris = ['gs://mybucket/file1.csv'], skip_leading_rows = 1)
            useLegacySql: false
- loadFile2:
    call: http.post
    args:
      url: https://bigquery.googleapis.com/bigquery/v2/projects/<projectID>/jobs
      auth:
        type: OAuth2
      body:
        configuration:
          query:
            query: CREATE OR REPLACE EXTERNAL TABLE mydataset.table2 OPTIONS (format = 'CSV', uris = ['gs://mybucket/file2.csv'], skip_leading_rows = 1)
            useLegacySql: false
- joinQuery:
    call: http.post
    args:
      url: https://bigquery.googleapis.com/bigquery/v2/projects/<projectID>/jobs
      auth:
        type: OAuth2
      body:
        configuration:
          query:
            query: CREATE TABLE mydataset.newtable OPTIONS( expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)) AS SELECT * ......
            useLegacySql: false
    result: queryResult
- getState:
    call: http.get
    args:
      url: ${"https://bigquery.googleapis.com/bigquery/v2/projects/<projectID>/jobs/" + queryResult.body.jobReference.jobId}
      auth:
        type: OAuth2
    result: jobState
    next: testState
- testState:
    switch:
      - condition: ${jobState.body.status.state == "DONE"}
        next: extractData
    next: waitAndGetState
- waitAndGetState:
    call: sys.sleep
    args:
      seconds: 1
    next: getState
- extractData:
    call: http.post
    args:
      url: https://bigquery.googleapis.com/bigquery/v2/projects/<projectID>/jobs
      auth:
        type: OAuth2
      body:
        configuration:
          extract:
            destinationUri: gs://<YourBucket>/bq-extract.csv
            destinationFormat: CSV
            sourceTable:
              projectId: <projectID>
              datasetId: mydataset
              tableId: newtable
    result: extractResult
- returnOutput:
    return: ${extractResult}

然后,使用 Cloud Scheduler 直接调用Create Workflow 执行 API,主体为空{},OAuth2 认证方式。

于 2021-03-26T09:25:42.737 回答