我需要帮助/建议来实现以下用例。
我在 Google Cloud Storage 存储桶中有 2 个 csv 文件,我需要基于一个公共列加入这 2 个文件,并且我需要将输出文件保存回 Google Cloud Storage 存储桶。
我需要使用任何谷歌云解决方案(使用梁 python 的云数据流)、云函数或任何其他云解决方案来实现这一点,因为我是谷歌云平台的新手,我请求大家帮助我实现这个用例。
期待您的回音
提前致谢
我需要帮助/建议来实现以下用例。
我在 Google Cloud Storage 存储桶中有 2 个 csv 文件,我需要基于一个公共列加入这 2 个文件,并且我需要将输出文件保存回 Google Cloud Storage 存储桶。
我需要使用任何谷歌云解决方案(使用梁 python 的云数据流)、云函数或任何其他云解决方案来实现这一点,因为我是谷歌云平台的新手,我请求大家帮助我实现这个用例。
期待您的回音
提前致谢
您有几种方法可以实现这一目标。如果合并的结果小于 1Gb,并且您只需要 1 个输出文件,您可以这样做
CREATE OR REPLACE EXTERNAL TABLE mydataset.table1
OPTIONS (
format = 'CSV',
uris = ['gs://mybucket/file1.csv'],
skip_leading_rows = 1
)
CREATE OR REPLACE EXTERNAL TABLE mydataset.table2
OPTIONS (
format = 'CSV',
uris = ['gs://mybucket/file2.csv'],
skip_leading_rows = 1
)
CREATE TABLE mydataset.newtable
OPTIONS(
expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)
) AS
SELECT *
FROM mydataset.table1 join mydataset.table2 ON ....
否则,您可以使用我在本文中描述的解决方案(我写的)
编辑 1
您可以使用此工作流定义示例来满足您的需要。
- loadFile1:
call: http.post
args:
url: https://bigquery.googleapis.com/bigquery/v2/projects/<projectID>/jobs
auth:
type: OAuth2
body:
configuration:
query:
query: CREATE OR REPLACE EXTERNAL TABLE mydataset.table1 OPTIONS (format = 'CSV', uris = ['gs://mybucket/file1.csv'], skip_leading_rows = 1)
useLegacySql: false
- loadFile2:
call: http.post
args:
url: https://bigquery.googleapis.com/bigquery/v2/projects/<projectID>/jobs
auth:
type: OAuth2
body:
configuration:
query:
query: CREATE OR REPLACE EXTERNAL TABLE mydataset.table2 OPTIONS (format = 'CSV', uris = ['gs://mybucket/file2.csv'], skip_leading_rows = 1)
useLegacySql: false
- joinQuery:
call: http.post
args:
url: https://bigquery.googleapis.com/bigquery/v2/projects/<projectID>/jobs
auth:
type: OAuth2
body:
configuration:
query:
query: CREATE TABLE mydataset.newtable OPTIONS( expiration_timestamp=TIMESTAMP_ADD(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)) AS SELECT * ......
useLegacySql: false
result: queryResult
- getState:
call: http.get
args:
url: ${"https://bigquery.googleapis.com/bigquery/v2/projects/<projectID>/jobs/" + queryResult.body.jobReference.jobId}
auth:
type: OAuth2
result: jobState
next: testState
- testState:
switch:
- condition: ${jobState.body.status.state == "DONE"}
next: extractData
next: waitAndGetState
- waitAndGetState:
call: sys.sleep
args:
seconds: 1
next: getState
- extractData:
call: http.post
args:
url: https://bigquery.googleapis.com/bigquery/v2/projects/<projectID>/jobs
auth:
type: OAuth2
body:
configuration:
extract:
destinationUri: gs://<YourBucket>/bq-extract.csv
destinationFormat: CSV
sourceTable:
projectId: <projectID>
datasetId: mydataset
tableId: newtable
result: extractResult
- returnOutput:
return: ${extractResult}
然后,使用 Cloud Scheduler 直接调用Create Workflow 执行 API,主体为空{},OAuth2 认证方式。