1

我目前正在尝试使用 Python 数据验证包“Great Expectations”。

我目前正在使用 GreatExpectationsOperator 调用特定数据源(PostgreSQL 数据源)上的期望套件。

my_ge_task = GreatExpectationsOperator(
    task_id='my_task',
    expectation_suite_name='suite.error',
    batch_kwargs={
        'table': 'data_quality',
        'datasource': 'data_quality_datasource',
        'query': "SELECT * FROM data_qualityWHERE batch='abc';"
    },
    data_context_root_dir=ge_root_dir
)

我想弄清楚的是如何存储和获取我的数据源凭据。对于使用 PostgreSQL 的其他操作,我一直在使用 PostgreSQL 连接来存储数据库凭据并使用 PostgreSQL 挂钩与数据库进行交互。然而,由于期望很高,postgreSQL 连接详细信息存储在 config_variables.yaml 中的 Great Expectations 上下文中。我在创建我的 dockerfile 时尝试使用 ENV 变量并将它们用作凭据并且它可以工作,但我试图找到一种更清洁的方法,可能使用我现有的 PostgreSQL 连接详细信息用于数据源。

网上似乎没有太多关于如何完成此操作的详细信息,因此非常感谢任何帮助。

谢谢,

4

1 回答 1

0

一种可能的解决方法是在 PythonOperator 中使用 GreatExpectationOperator,以便在运行 GE 之前,脚本从 Airflow Connection 中提取连接数据并将其保存为环境变量。

像这样的东西:

import os

from airflow.hooks.base import BaseHook

from great_expectations_provider.operators.great_expectations import (
    GreatExpectationsOperator,
)

def get_ge_runner(task_id, checkpoint_name, connection_name):
    def run_ge(ds, **kwargs):
        connection = BaseHook.get_connection(connection_name)
        os.environ[
            "my_db_creds"
        ] = f"postgresql+psycopg2://{connection.login}:{connection.password}@{connection.host}:{connection.port}/{connection.schema}"
        op = GreatExpectationsOperator(
            task_id=task_id,
            data_context_root_dir=ge_root_dir,
            run_name=task_id,
            checkpoint_name=checkpoint_name,
        )
        op.execute(kwargs)

    return run_great_expectations
于 2022-02-04T19:45:10.120 回答