0

我有一些使用 SnowflakeOperator 和 SnowflakeHook 的 DAG。他们都使用snowflake_connection输入连接到雪花,我保存Admin > Connections在 Airflow 下。

SnowflakeHook(
    snowflake_conn_id="snowflake_connection",
    database='SOME_DB',
    schema='PUBLIC'
)

虽然这种安排在 Airflow 环境中有效,但我想在我的定位机器中将代码作为常规脚本运行。但它无法访问snowflake_connection我保存在 Airflow 中的那个。有什么解决方法吗?

有没有办法可以直接提供连接字符串或用户名/密码SnowflakeHookSnowflakeOperator建立数据库连接?

4

1 回答 1

0

您可以模拟连接。


conn = Connection(
    conn_type="gcpssh",
    login="cat",
    host="conn-host",
)
conn_uri = conn.get_uri()
with mock.patch.dict("os.environ", AIRFLOW_CONN_MY_CONN=conn_uri):
    assert "cat" == Connection.get("my_conn").login

有关详细信息,请参阅:模拟变量和连接

于 2021-12-08T18:03:26.803 回答