我正在扩展 Airflow:2.2.0 图像并尝试在 DAG 中使用 Connection 来使用自定义 Hook 发出 GET 请求。但是,无论我尝试什么,并遵循找到的任何建议,DAG 似乎都没有获得密码和额外字段的解密版本。
DAG 的输出正确地显示了连接信息(逻辑上编辑为源代码在打印时指示),但尝试使用它或打印它不会解密密码。
我正在使用
[var_name]
以下内容编辑所有敏感/个人信息
我尝试直接在PythonOperator
DAG 中获取详细信息:
conn = BaseHook.get_connection(conn_id=[conn_id])
print(conn.password)
print(conn.extra)
并从导入的自定义挂钩中
# inside the PythonOperator
with JwtHook(conn_id=[conn_id]) as api:
result = api.get_metadata()
print(result)
# with JwtHook partially doing (again, redacted majority of code):
...
def get_conn(self):
"""
Returns the connection used by the hook for querying data.
Should in principle not be used directly.
"""
# Return existing session if initiated
if self._session is None:
# Fetch config for the given connection (host, login, etc).
config = self.get_connection(self._conn_id)
print(config.extra)
print(config.extra_dejson)
if not config.host or not self._extras.get("token_host"):
raise ValueError(
f"No (token)host specified in connection {self._conn_id}"
)
self._login = (config.login, config.password)
print(self._login)
...
...
{base.py:79} INFO - Using connection to: id: [conn_id]. Host: [url], Port: None, Schema: None, Login: [username], Password: ***, extra: {'token_host': '***', 'user_id': ''}
{logging_mixin.py:109} INFO - {"token_host": "***","user_id": ""}
{logging_mixin.py:109} INFO - {'token_host': '***', 'user_id': ''}
{logging_mixin.py:109} INFO - ([username], '***')
我在耗尽互联网后尝试、遵循和实施的事情:
- 跨容器设置共享的 Fernet 密钥
运行后确认共享使用# set up for all containers in docker-compose environment: &airflow-common-env AIRFLOW__CORE__FERNET_KEY: ${_AIRFLOW__CORE__FERNET_KEY}
对于所有容器。docker-compose run airflow-[containername] airflow config get-value core fernet_key
- 我已经通过 Web UI 和命令行创建了连接。无论哪种方式,Web UI 都很好地显示了所有详细信息的连接,命令
connections get [conn_id]
输出它与所有解密的秘密相同。id | conn_id | conn_type | description | host | schema | login | password | port | is_encrypted | is_extra_encrypted | extra_dejson | get_uri 1 | [conn_id] | HTTP | Custom API connection | [url] | None | [username] | [password] | None | True | True | {'token_host': [url], 'user_id': [id]} | [uri]
对此进行排序的任何帮助都会很棒。我只是无法确定问题 - 并且可能我错误地使用了 Airflow。有趣的是,我没有收到任何错误(如这里)。我想知道 DAG 执行者是否具有与 Connection 创建者相同的权利 - 但我找不到/看不到如何确认这种怀疑。
任何帮助深表感谢。感谢您提前阅读!
[编辑]
所以,我没有尝试test
从 CLI 运行 DAG - 你知道什么 - 它有效!相反,我是从 UI 手动触发 DAG - 但因为我只有一个(管理员)用户 - 我并不怀疑它不会工作。所以我很清楚这是某种创作问题。有人对如何正确设置有任何指示吗?谢谢!