0

考虑我在 Airflow 中有多个 DAG。

DAG 中的每个任务都尝试执行 presto 查询,我只是覆盖了气流中的 get_conn() 方法。在每次调用 get_conn() 方法时,它都会从 AWS 机密管理器获取凭证。

对秘密管理器的最大请求是 5000。在这种情况下,我需要在某处缓存我的凭据(不应使用连接/变量、DB、S3),以便它们可以在所有任务中使用,而无需调用秘密管理器。

我的问题是,

有什么方法可以通过一次调用 get_conn() 来使用 Python/Airflow 在我们的代码中处理这些凭据?

4

1 回答 1

1

您可以编写自己的自定义秘密后端https://airflow.apache.org/docs/apache-airflow/stable/security/secrets/secrets-backend/index.html#roll-your-own-secrets-backend扩展 AWS一个并覆盖读取凭据并将其存储在某处的方法(例如,在本地文件或数据库中作为缓存机制)。

但是,如果您使用的是本地文件系统,则必须意识到您的缓存重用/效率将取决于您的任务的运行方式。如果您正在运行 CeleryExecutor,那么这样的本地文件将可用于在同一 worker 上运行的所有进程(但不适用于在其他 worker 上运行的 celery 进程)。如果您正在运行 KubernetesExecutor,则每个任务都在其自己的 Pod 中运行,因此您必须将一些持久性或临时存储挂载/映射到您的 PODS 内才能重用它。另外,您必须以某种方式解决并发进程在那里写入并定期刷新此类缓存或在其更改时刷新的问题。

此外,您必须格外小心,因为它带来了一些关于安全性的问题,因为所有 DAG 和 python 代码都可以使用本地缓存,即使它们没有使用连接(例如 Airflow 2.1+ 内置自动化在这种情况下,秘密屏蔽将不起作用,您必须小心不要将凭据打印到日志中。

于 2021-07-14T10:50:18.733 回答