在great_expectations中,我正在尝试将检查点添加到上下文中。这批数据是指存储在 s3 上的 csv 文件,其中有一个半列作为分隔符。我正在使用 PySpark 作为连接器加载批次。我尝试使用以下代码:
首先,我定义了一个批处理请求来检索数据。这里推荐使用batch_spec_passthrough来指定分隔符:
batch_request = {
"datasource_name": "my_datasource",
"data_connector_name": "default_inferred_data_connector_name",
"data_asset_name": "my_data_asset",
"batch_spec_passthrough": {'reader_options': {'sep': ';', 'header': 'true', 'inferSchema': 'true'}},
}
br = BatchRequest(**batch_request)
然后我指定了我想使用的期望套件:
expectation_suite_name = "my_expectation_suite"
此时我定义了检查点:
checkpoint_name = "my_checkpoint"
checkpoint_config = {
"class_name": "SimpleCheckpoint",
"validations": [
{
"batch_request": br,
"expectation_suite_name": expectation_suite_name
}
]
}
checkpoint = SimpleCheckpoint(
f"{checkpoint_name}__{expectation_suite_name}",
context,
**checkpoint_config
)
最后,我将检查点添加到数据上下文中,然后运行它。
checkpoint_json = checkpoint.get_config().to_json_dict()
context.add_checkpoint(**checkpoint_json)
context.run_checkpoint(checkpoint_name=checkpoint_name)
问题是当我运行检查点时,我得到一个错误,表明批处理数据没有以正确的方式加载。没有任何期望起作用,因为似乎没有使用半列作为分隔符加载批处理数据,因此产生了单列数据框。这是我在运行检查点时看到的错误之一:
"exception_message": "错误:BatchData 中的列 "GR" 不存在。"
但是,当我不将检查点添加到上下文中而只是运行检查点时,一切都很好。所以以下命令有效:
checkpoint_result = checkpoint.run()
有什么帮助吗?