1

BigQuery Python: google.api_core.exceptions.BadRequest: 400 Error while reading data, error message: Schema mismatch: referenced variable 'ro_sub_ros.$is_not_null' 的数组级别为 1,而 Parquet 列的对应字段路径有 0 个重复字段。

我的原始数据如下所示:

testData = {
    "ro_user_email": "tech@techietech.com",
    "ro_account_id": "23402042",
    "ro_sub_account_id": "34020334",
    "ro_name": "Test RO",
    "ro_number": "1304340",
    "ro_currency": {"label":"USD","value":"USD"},
    "ro_dates": {"from":now,"to":now},
    "ro_status": "draft",
    "ro_operation_timestamp": pd.Timestamp(now),
    "ro_billing_cycle": {"label":"Fortnightly","value":"Fortnightly"},
    "ro_sub_ros": [
        {
            "sub_ro_id": "2323",
            "valid":False,
            "sub_ro_name": "Testing",
            "sub_ro_dates":{"from":now,"to":now},
            "sub_ro_budget": 1203302.22,
            "sub_ro_revenue_price":1202302.22,
            "sub_ro_revenue_selected": {"label":"Fortnightly","value":"Fortnightly"},
            "sub_ro_revenue_model_selected": {"label":"Fortnightly","value":"Fortnightly"},
            "sub_ro_campaigns_selected": [{"label":"Fortnightly","value":"Fortnightly"}],
            "sub_ro_ios_selected": [{"label":"Fortnightly","value":"Fortnightly"}],
            "sub_ro_client_id": [{"label":"Fortnightly","value":"Fortnightly"}],
            "sub_ro_ids_selected": [{"label":"Fortnightly","value":"Fortnightly"}],
            "sub_ro_pixels_selected": [{"label":"Fortnightly","value":"Fortnightly"}],
            "kpi_1_metric_selected": {"label":"Fortnightly","value":"Fortnightly"},
            "attribution_model_selected": {"label":"Fortnightly","value":"Fortnightly"},
            "kpi_window_selected": {"label":"Fortnightly","value":"Fortnightly"},
            "deepMetrics_selected": {"label":"Fortnightly","value":"Fortnightly"},
            "sub_ro_kpi_goal":"ROI"

        }
    ],

}

以下是我创建 BQ Schema 的方式:

schema = [
        bigquery.SchemaField("ro_user_email", "STRING", mode="REQUIRED"),
        bigquery.SchemaField("ro_account_id", "STRING", mode="REQUIRED"),
        bigquery.SchemaField("ro_sub_account_id", "STRING", mode="REQUIRED"),
        bigquery.SchemaField("ro_name", "STRING", mode="REQUIRED"),
        bigquery.SchemaField("ro_number", "STRING", mode="REQUIRED"),
        bigquery.SchemaField("ro_currency", 
        "STRUCT", 
        mode="REQUIRED",
        fields=[
            bigquery.SchemaField("label", "STRING", mode="REQUIRED"),
            bigquery.SchemaField("value", "STRING", mode="REQUIRED"),
            ]
            ),

        bigquery.SchemaField("ro_dates", 
        "STRUCT", 
        mode="REQUIRED",
        fields=[
            bigquery.SchemaField("from", "DATE", mode="REQUIRED"),
            bigquery.SchemaField("to", "DATE", mode="REQUIRED"),
            ]
            ),

        bigquery.SchemaField("ro_status","STRING", mode="REQUIRED"),
        bigquery.SchemaField("ro_operation_timestamp","TIMESTAMP", mode="REQUIRED"),

        bigquery.SchemaField("ro_billing_cycle", 
        "STRUCT", 
        mode="REQUIRED",
        fields=[
            bigquery.SchemaField("label", "STRING", mode="REQUIRED"),
            bigquery.SchemaField("value", "STRING", mode="REQUIRED"),
            ]
            ),
        
        bigquery.SchemaField(
        "ro_sub_ros",
        "RECORD",
        mode="REPEATED",
        fields=[
            bigquery.SchemaField("sub_ro_id", "STRING", mode="REQUIRED"),
            bigquery.SchemaField("valid", "BOOL", mode="REQUIRED"),
            bigquery.SchemaField("sub_ro_name", "STRING", mode="REQUIRED"),
            bigquery.SchemaField("sub_ro_dates", "STRUCT", mode="REQUIRED",
            fields=[
                bigquery.SchemaField("from", "DATE", mode="REQUIRED"),
                bigquery.SchemaField("to", "DATE", mode="REQUIRED"),
                ]
                ),
            bigquery.SchemaField("sub_ro_budget", "FLOAT", mode="REQUIRED"),
            bigquery.SchemaField("sub_ro_revenue_price", "FLOAT", mode="REQUIRED"),
            bigquery.SchemaField("sub_ro_revenue_selected",
            "STRUCT", mode="REQUIRED",
            fields=[
                bigquery.SchemaField("label", "STRING", mode="REQUIRED"),
                bigquery.SchemaField("value", "STRING", mode="REQUIRED"),

            ]
            ),
            bigquery.SchemaField("sub_ro_revenue_model_selected",
            "STRUCT", mode="REQUIRED",
            fields=[
                bigquery.SchemaField("label", "STRING", mode="REQUIRED"),
                bigquery.SchemaField("value", "STRING", mode="REQUIRED"),

            ]
            ),
            bigquery.SchemaField("sub_ro_campaigns_selected","RECORD",
            mode="REPEATED",
            fields=[
                bigquery.SchemaField("model_list",
            "STRUCT", mode="REQUIRED",
            fields=[
                bigquery.SchemaField("label", "STRING", mode="REQUIRED"),
                bigquery.SchemaField("value", "STRING", mode="REQUIRED"),
            ]
            )
            ]),

            bigquery.SchemaField("sub_ro_ios_selected","RECORD",
            mode="REPEATED",
            fields=[
                bigquery.SchemaField("model_list",
            "STRUCT", mode="REQUIRED",
            fields=[
                bigquery.SchemaField("label", "STRING", mode="REQUIRED"),
                bigquery.SchemaField("value", "STRING", mode="REQUIRED"),
            ]
            )
            ]),

            bigquery.SchemaField("sub_ro_client_id","RECORD",
            mode="REPEATED",
            fields=[
                bigquery.SchemaField("model_list",
            "STRUCT", mode="REQUIRED",
            fields=[
                bigquery.SchemaField("label", "STRING", mode="REQUIRED"),
                bigquery.SchemaField("value", "STRING", mode="REQUIRED"),
            ]
            )
            ]),

            #

            bigquery.SchemaField("sub_ro_ids_selected","RECORD",
            mode="REPEATED",
            fields=[
                bigquery.SchemaField("model_list",
            "STRUCT", mode="REQUIRED",
            fields=[
                bigquery.SchemaField("label", "STRING", mode="REQUIRED"),
                bigquery.SchemaField("value", "STRING", mode="REQUIRED"),
            ]
            )
            ]),

            bigquery.SchemaField("sub_ro_pixels_selected","RECORD",
            mode="REPEATED",
            fields=[
                bigquery.SchemaField("model_list",
            "STRUCT", mode="REQUIRED",
            fields=[
                bigquery.SchemaField("label", "STRING", mode="REQUIRED"),
                bigquery.SchemaField("value", "STRING", mode="REQUIRED"),
            ]
            )
            ]),

            bigquery.SchemaField("kpi_1_metric_selected",
            "STRUCT", mode="REQUIRED",
            fields=[
                bigquery.SchemaField("label", "STRING", mode="REQUIRED"),
                bigquery.SchemaField("value", "STRING", mode="REQUIRED"),

            ]
            ),

            bigquery.SchemaField("attribution_model_selected",
            "STRUCT", mode="REQUIRED",
            fields=[
                bigquery.SchemaField("label", "STRING", mode="REQUIRED"),
                bigquery.SchemaField("value", "STRING", mode="REQUIRED"),

            ]
            ),

            bigquery.SchemaField("kpi_window_selected",
            "STRUCT", mode="REQUIRED",
            fields=[
                bigquery.SchemaField("label", "STRING", mode="REQUIRED"),
                bigquery.SchemaField("value", "STRING", mode="REQUIRED"),

            ]
            ),

            bigquery.SchemaField("deepMetrics_selected",
            "STRUCT", mode="REQUIRED",
            fields=[
                bigquery.SchemaField("label", "STRING", mode="REQUIRED"),
                bigquery.SchemaField("value", "STRING", mode="REQUIRED"),

            ]
            ),

            bigquery.SchemaField("sub_ro_kpi_goal", "STRING", mode="REQUIRED"),




        ],
    )
    ]

当我尝试使用上传此数据时,bigquery client library出现此错误:

job_config = bigquery.LoadJobConfig(schema=schema)
    return bq.client.load_table_from_dataframe(
        df, tablename, job_config=job_config
    ).result()

抛出:

google.api_core.exceptions.BadRequest: 400 Error while reading data, error message: Schema mismatch: referenced variable 'ro_sub_ros.$is_not_null' has array levels of 1, while the corresponding field path to Parquet column has 0 repeated 
fields.

不知道这里出了什么问题,如果我的架构太大而无法分析,有人可以展示一个REPEATED RECORD使用客户端库和熊猫数据框在谷歌大查询中上传的最小示例吗?

4

1 回答 1

1

您可以考虑验证这些选项。

验证 BigQuery 架构是否正确,这是一个使用重复记录的示例。可以看官方文档

# from google.cloud import bigquery
# client = bigquery.Client()
# project = client.project
# dataset_ref = bigquery.DatasetReference(project, 'my_dataset')
 
schema = [
    bigquery.SchemaField("id", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("first_name", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("last_name", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("dob", "DATE", mode="NULLABLE"),
    bigquery.SchemaField(
        "addresses",
        "RECORD",
        mode="REPEATED",
        fields=[
            bigquery.SchemaField("status", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("address", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("city", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("state", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("zip", "STRING", mode="NULLABLE"),
            bigquery.SchemaField("numberOfYears", "STRING", mode="NULLABLE"),
        ],
    ),
]
table_ref = dataset_ref.table("my_table")
table = bigquery.Table(table_ref, schema=schema)
table = client.create_table(table)  # API request
print("Created table {}".format(table.full_table_id))

验证记录语法是否正确。是一个带有模式值的示例。

{"id":"1","first_name":"John","last_name":"Doe","dob":"1968-01-22","addresses":[{"status":"current","address":"123 First Avenue","city":"Seattle","state":"WA","zip":"11111","numberOfYears":"1"},{"status":"previous","address":"456 Main Street","city":"Portland","state":"OR","zip":"22222","numberOfYears":"5"}]}

{"id":"2","first_name":"Jane","last_name":"Doe","dob":"1980-10-16","addresses":[{"status":"current","address":"789 Any Avenue","city":"New York","state":"NY","zip":"33333","numberOfYears":"2"},{"status":"previous","address":"321 Main Street","city":"Hoboken","state":"NJ","zip":"44444","numberOfYears":"3"}]}

考虑在你的 python 代码中使用“自动检测模式”。类似于这个例子。您可以查看更多文档

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of the table to create.
# table_id = "your-project.your_dataset.your_table_name

# Set the encryption key to use for the destination.
# TODO: Replace this key with a key you have created in KMS.
# kms_key_name = "projects/{}/locations/{}/keyRings/{}/cryptoKeys/{}".format(
#     "cloud-samples-tests", "us", "test", "test"
# )
job_config = bigquery.LoadJobConfig(
    autodetect=True, source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
)
uri = "gs://cloud-samples-data/bigquery/us-states/us-states.json"
load_job = client.load_table_from_uri(
    uri, table_id, job_config=job_config
)  # Make an API request.
load_job.result()  # Waits for the job to complete.
destination_table = client.get_table(table_id)
print("Loaded {} rows.".format(destination_table.num_rows))

您可以在此页面中验证 JSON 格式。

于 2021-10-11T22:10:42.640 回答