0

i am trying to load multiple csv files into table by below code but it is failing: Can anyone let me know where i am wrong: ##################

def csv_loader(data, context):
        client = bigquery.Client()
        dataset_id = os.environ['DATASET']
        dataset_ref = client.dataset(dataset_id)
        job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.CSV,
        field_delimiter="|",
        write_disposition="WRITE_TRUNCATE",
        skip_leading_rows=1,
         )

        # get the URI for uploaded CSV in GCS from 'data'
        uri = 'gs://' + os.environ['BUCKET'] + '/' + data['name']
        # lets do this
        load_job = client.load_table_from_uri(
                uri,
                dataset_ref.table(os.environ['TABLE'])

        load_job.result()  # wait for table load to complete.
        print('Job finished.')
        destination_table = client.get_table(dataset_ref.table(os.environ['TABLE']))
        print('Loaded {} rows.'.format(destination_table.num_rows))

###################

Above works fine if schema is mentioned for one file but with multiple files gives error.Can anyone tell me what I am doing wrong

4

1 回答 1

2

根据您的最后评论,架构不同,您想使用架构自动检测。但是,我在您的代码中没有看到该标志,也没有看到您在作业加载方法中传递了 job_config 变量。

尝试如下:

注意:我在变量中添加了标志autodetect=True,并且还在load_table_from_uri()函数中job_config传递了job_config变量。

def csv_loader(data, context):
        client = bigquery.Client()
        dataset_id = os.environ['DATASET']
        dataset_ref = client.dataset(dataset_id)
        job_config = bigquery.LoadJobConfig(
        autodetect=True,
        source_format=bigquery.SourceFormat.CSV,
        field_delimiter="|",
        write_disposition="WRITE_TRUNCATE",
        skip_leading_rows=1,
         )

        # get the URI for uploaded CSV in GCS from 'data'
        uri = 'gs://' + os.environ['BUCKET'] + '/' + data['name']
        # lets do this
        load_job = client.load_table_from_uri(
                uri,
                dataset_ref.table(os.environ['TABLE'],
                job_config=job_config
               )

        load_job.result()  # wait for table load to complete.
        print('Job finished.')
        destination_table = client.get_table(dataset_ref.table(os.environ['TABLE']))
        print('Loaded {} rows.'.format(destination_table.num_rows))
于 2021-08-09T13:38:22.567 回答