问题
有没有更有效的方法可以用来简化从 python 脚本或任何其他方式将 csv 文件上传到 bigquery 的过程?
描述
我有 1528596 个 CSV 文件需要上传到 bigquery [表已创建]。我目前的方法被证明很慢,我认为这是由于 google bigquery上传配额。超过配额会给我以下例外:
Traceback (most recent call last):
File “name_of_file.py", line 220, in <module>
File "name_of_file.py", line 122, in upload_csv_to_bigquery
job.result() # Waits for table load to complete.
File "/home/bongani/.local/lib/python3.6/site-packages/google/cloud/bigquery/job.py", line 660, in result
return super(_AsyncJob, self).result(timeout=timeout)
File "/home/bongani/.local/lib/python3.6/site-packages/google/api_core/future/polling.py", line 120, in result
raise self._exception
google.api_core.exceptions.Forbidden: 403 Quota exceeded: Your project exceeded quota for imports per project. For more information, see https://cloud.google.com/bigquery/troubleshooting-errors
我已通过电子邮件向谷歌支持发送电子邮件以尝试增加配额,但他们回复说,他们无法做到。
我目前的实现:
import os
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
from google.cloud import bigquery
from google.cloud.bigquery import LoadJobConfig
root_dir = "/path/to/some/directory"
dataset_id = 'dataset_namex'
bigquery_client = bigquery.Client()
def upload_csv_to_bigquery(table_name, csv_full_path):
s = time.time()
load_config = LoadJobConfig()
load_config.skip_leading_rows = 1
table_ref = bigquery_client.dataset(dataset_id).table(table_name)
with open(csv_full_path, 'rb') as source_file:
job = bigquery_client.load_table_from_file(source_file, table_ref, job_config=load_config) # API request
job.result() # Waits for table load to complete.
print(f"upload time: {time.time() - s}")
def run():
with ProcessPoolExecutor(max_workers=30) as process_executor:
futures = []
for csvfile in os.listdir(root_dir):
table_name = csvfile.split('_')[-1]
futures.append(process_executor.submit(upload_csv_to_bigquery, table_name, root_dir + csvfile))
for future in as_completed(futures):
future.result()
print("DONE!!!")
run()
这张图片显示了我每秒发出的请求数,上传。来自 Google Cloud Platform 的指标