1

我正在尝试从基于 docker 的气流服务转移到 AWS 提供的托管 apache 气流。MWAA 要求您指定一个包含所有 dag 的 dags 文件夹。此外,我可以在 dags 文件夹中创建其他文件夹和文件,并包含一个 .airflowignore 文件,以便将这些文件/文件夹视为非 dag。我可以在这些文件中指定一些常用函数并将它们导入到我的 dag 代码中以使用这些函数。到目前为止,一切都很好。当我必须在这些常见的非 dag 文件中使用某些环境变量时,问题就来了。我从这里找到了如何使用插件设置运行时操作系统环境变量https://docs.aws.amazon.com/mwaa/latest/userguide/samples-env-variables.html

from airflow.plugins_manager import AirflowPlugin
import os

os.environ["PATH"] = os.getenv("PATH") + ":/usr/local/airflow/.local/lib/python3.7/site-packages" 
os.environ["JAVA_HOME"]="/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.272.b10-1.amzn2.0.1.x86_64"
os.environ["My_Var"] = "hello"

class EnvVarPlugin(AirflowPlugin):                
     name = 'env_var_plugin'
   

使用此代码,我设置了一个 ENV 变量 My_Var。我的饲料结构看起来像这样

dags
|
|-hello_dag.py
|-common
|  |
|  |-varcheck.py

在 hello_dag.py 中,我从 varcheck.py 导入 VAR

import os
from common.varcheck import VAR
print(os.environ["My_Var"])
print(VAR)

在 varcheck.py 我正在做

import os
VAR = os.environ["My_Var"]

我收到一个导入错误

Broken DAG: [/usr/local/airflow/dags/hello_dag.py] Traceback (most recent call last):
  File "/usr/local/airflow/dags/varcheck.py", line 2, in <module>
    VAR = os.environ['MY_VAR']
  File "/usr/lib64/python3.7/os.py", line 681, in __getitem__
    raise KeyError(key) from None
KeyError: 'MY_VAR'

有趣的是,如果我将 varcheck.py 文件更改为

import os
VAR = os.environ

然后代码工作正常, print(VAR) 打印所有环境变量。我想知道我们如何在 MWAA 的非 dag 文件中使用 os 环境变量,因为这对我们的实现至关重要。

编辑:回购的当前结构

repo
|-common (used by all others)
|-airflow (deployed in ECS)
|-jobs_1 (deployed in batch)
|-jobs_2 (deployed in batch)
|-jobs_3 (deployed in lambda)

公共文件使用环境变量。如果我们要用气流变量替换它们,我们需要为气流维护单独的公共文件,因为 job_1、job_2 和 job_3 与气流无关,它们使用公共文件。

4

2 回答 2

0

环境变量取决于 CASE。

与您的帖子相关:
在您使用的第一个帖子代码块中

os.environ["My_Var"] = "hello"

在您的堆栈跟踪中,我们看到

VAR = os.environ['MY_VAR']

但是,如果您计划在多个地方使用此环境变量,但气流不需要启动,您也可以考虑使用 Airflow 变量。

于 2021-06-28T06:33:33.460 回答
0

问题出在您的插件文件中。

VAR = os.environ["My_Var"]

没有设置名称为My_Var的环境变量,因此它会给您一个 KeyError。顺便说一句,您可能想尝试 Airflow 变量。

于 2021-06-28T07:02:25.633 回答