0

问题

有没有更有效的方法可以用来简化从 python 脚本或任何其他方式将 csv 文件上传到 bigquery 的过程?

描述

我有 1528596 个 CSV 文件需要上传到 bigquery [表已创建]。我目前的方法被证明很,我认为这是由于 google bigquery上传配额。超过配额会给我以下例外:

Traceback (most recent call last):
  File “name_of_file.py", line 220, in <module>
  File "name_of_file.py", line 122, in upload_csv_to_bigquery
    job.result()  # Waits for table load to complete.
  File "/home/bongani/.local/lib/python3.6/site-packages/google/cloud/bigquery/job.py", line 660, in result
    return super(_AsyncJob, self).result(timeout=timeout)
  File "/home/bongani/.local/lib/python3.6/site-packages/google/api_core/future/polling.py", line 120, in result
    raise self._exception
google.api_core.exceptions.Forbidden: 403 Quota exceeded: Your project exceeded quota for imports per project. For more information, see https://cloud.google.com/bigquery/troubleshooting-errors

我已通过电子邮件向谷歌支持发送电子邮件以尝试增加配额,但他们回复说,他们无法做到。

我目前的实现:

import os
import time
from concurrent.futures import ProcessPoolExecutor, as_completed

from google.cloud import bigquery
from google.cloud.bigquery import LoadJobConfig

root_dir = "/path/to/some/directory"
dataset_id = 'dataset_namex'

bigquery_client = bigquery.Client()


def upload_csv_to_bigquery(table_name, csv_full_path):
    s = time.time()
    load_config = LoadJobConfig()
    load_config.skip_leading_rows = 1
    table_ref = bigquery_client.dataset(dataset_id).table(table_name)
    with open(csv_full_path, 'rb') as source_file:
        job = bigquery_client.load_table_from_file(source_file, table_ref, job_config=load_config)  # API request
        job.result()  # Waits for table load to complete.
    print(f"upload time: {time.time() - s}")


def run():
    with ProcessPoolExecutor(max_workers=30) as process_executor:
        futures = []
        for csvfile in os.listdir(root_dir):
            table_name = csvfile.split('_')[-1]
            futures.append(process_executor.submit(upload_csv_to_bigquery, table_name, root_dir + csvfile))
        for future in as_completed(futures):
            future.result()
    print("DONE!!!")


run()

这张图片显示了我每秒发出的请求数,上传。来自 Google Cloud Platform 的指标

4

1 回答 1

1

编写一个脚本来逐行读取您的 CSV,并使用流式插入上传它们。流式插入的限制是每秒 100k 行或每秒 100MB,无论您先达到什么。

bigquery.tabledata.insertAll对 API 调用的数量没有速率限制,因此它是上传大量小文件以达到bigquery.tables.insert.

于 2018-06-16T22:24:00.850 回答