0

我是大查询和学习它的新手。我有一个要求,我必须将近 300 个表从 oracle 源加载到 Big Query 临时表。加载数据的推荐方法是什么?我知道我可以为此使用数据流,但我必须为其创建 300 个数据流任务还是创建单个作业来迭代它?请分享您的经验和不同的方法。非常感谢。

问候,文卡特。

4

2 回答 2

0

根据我的经验,我们想将我们的数据仓库迁移到 bigquery 中,我没有使用数据流或任何工具,我只是将表导出到 csv 文件中,然后使用 python 代码遍历文件并将它们上传到 bigquery https:// cloud.google.com/bigquery/docs/loading-data-local#python

或者您可以将它们上传到 gcs,然后再上传到 bigquery,如果这是日常操作我认为维护一个迭代表列表的单个代码将它们提取并将它们附加到 bigquery 表中比创建 300 个任务更容易

更新:

使用 pandas-gbq 从 oracle 读取数据到 bigquery 的示例代码:

import cx_Oracle
from sqlalchemy import create_engine

engine = create_engine('oracle://user:password@host_or_scan_address:1521/ORACLE_SERVIVE_NAME')

results = pd.read_sql('select * from table_name', engine,chunk_size= 5000)
if_exists = 'append' # or replace
schema = [] #specify bq_schema here if you don't want the autodetect schema
for result in results:
     result.to_gbq(destination_table='dataset_id.table_id', project_id='project-id',
                        table_schema=schema, if_exists=if_exists)

如果要将数据作为一个块加载,则可以删除 chunk_size 参数,但如果表很大,这可能会消耗内存



results = pd.read_sql('select * from table_name')
if_exists = 'append' # or replace
schema = [] #specify bq_schema here if you don't want the autodetect schema
results.to_gbq(destination_table='dataset_id.table_id', project_id='project-id',
                        table_schema=schema, if_exists=if_exists)

于 2020-06-25T02:34:06.133 回答
0

我的建议是提取文件中的 Oracle 表内容(例如 CSV 格式)。将文件复制到 Cloud Storage。然后将它们加载到 BigQuery 中。

如果您想要做的转换是 SQL,那么数据流是无用的(昂贵、效率较低、需要更多时间)。

但是,如果您需要请求外部 API(用于数据转换,例如 ML API),或者如果您想将数据下沉到 BigQuery 之外的另一个数据库(Firestore、BigTable、Cloud SQL...),数据流是正确的工具

编辑

为了更深入地了解,我假设这些表在同一个数据集中。然后,代码很简单

def hello_gcs_generic(data, context):

    client = bigquery.Client()
    dataset_id = 'my_dataset'

    bucket = data['bucket']
    path = data['name']

    table_name = path[path.rfind('/')+1:path.rfind(('.'))]

    dataset_ref = client.dataset(dataset_id)
    job_config = bigquery.LoadJobConfig(
        autodetect=True,
        skip_leading_rows=1,
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field="date"  # Name of the column to use for partitioning.
        ),
        source_format=bigquery.SourceFormat.CSV
    )

    uri = "gs://{}/{}".format(bucket,path)

    load_job = client.load_table_from_uri(
        uri, dataset_ref.table(table_name), job_config=job_config
    )  # API request
    print("Starting job {}".format(load_job.job_id))
    
    load_job.result()  # Waits for table load to complete.
    print("Job finished.")

在这里,对存储桶中的每个文件都调用 Cloud Functions。因此,如果同时drop 300个文件,会触发300个函数,并行执行。

几点:

  • 表名等于文件名。
  • 默认情况下,行为是写入附加(将数据添加到表中。
  • 我强烈建议您在一个字段上对您的表进行分区,这里是一个日期字段。
  • 加载作业通常很快。如果需要超过 9 分钟,请使用 Cloud Run(限制为 15 分钟)或不要等待加载结束(删除此load_job.result()
  • CSV 从 1 个前导行开始。不是强制性的,但在模式自动检测的情况下,这有助于很好地命名列。

注意:我假设放入 Cloud Storage 的所有文件都必须集成到 BigQuery 中。如果没有,您可以按照我的一篇文章中的描述添加过滤器

于 2020-06-25T07:23:09.913 回答