0

great_expectations中,我正在尝试将检查点添加到上下文中。这批数据是指存储在 s3 上的 csv 文件,其中有一个半列作为分隔符。我正在使用 PySpark 作为连接器加载批次。我尝试使用以下代码:

首先,我定义了一个批处理请求来检索数据。这里推荐使用batch_spec_passthrough来指定分隔符:

batch_request = {
    "datasource_name": "my_datasource",
    "data_connector_name": "default_inferred_data_connector_name",
    "data_asset_name": "my_data_asset",
    "batch_spec_passthrough": {'reader_options': {'sep': ';', 'header': 'true', 'inferSchema': 'true'}},
}
br = BatchRequest(**batch_request)

然后我指定了我想使用的期望套件:

expectation_suite_name = "my_expectation_suite"

此时我定义了检查点:

checkpoint_name = "my_checkpoint"
checkpoint_config = {
    "class_name": "SimpleCheckpoint",
    "validations": [
        {
            "batch_request": br,
            "expectation_suite_name": expectation_suite_name
        }
    ]
}
checkpoint = SimpleCheckpoint(
    f"{checkpoint_name}__{expectation_suite_name}",
    context,
    **checkpoint_config
)

最后,我将检查点添加到数据上下文中,然后运行它。

checkpoint_json = checkpoint.get_config().to_json_dict()
context.add_checkpoint(**checkpoint_json)
context.run_checkpoint(checkpoint_name=checkpoint_name)

问题是当我运行检查点时,我得到一个错误,表明批处理数据没有以正确的方式加载。没有任何期望起作用,因为似乎没有使用半列作为分隔符加载批处理数据,因此产生了单列数据框。这是我在运行检查点时看到的错误之一:

"exception_message": "错误:BatchData 中的列 "GR" 不存在。"

但是,当我不将检查点添加到上下文中而只是运行检查点时,一切都很好。所以以下命令有效:

checkpoint_result = checkpoint.run()

有什么帮助吗?

4

0 回答 0