2

我们正在建立一个 Airflow 框架,多个数据科学家团队可以在其中协调他们的数据处理管道。我们开发了一个 Python 代码库来帮助他们实现 DAG,其中包括各种包和模块中的函数和类(也包括 Operator 子类)。

每个团队都会将自己的 DAG 与包中的函数和类一起打包在一个 ZIP 文件中。例如,第一个 ZIP 文件将包含

邮编1:

main_dag_teamA.py

子文件夹1:package1-with-generic-functions + init .py

子文件夹2:package2-with-generic-operators + init .py

另一个 ZIP 文件将包含

邮编2:

main_dag_teamB.py

子文件夹1:package1-with-generic-functions + init .py

子文件夹2:package2-with-generic-operators + init .py

请注意,在两个 ZIP 文件中,子文件夹 1 和子文件夹 2 通常完全相同,这意味着完全相同的文件具有相同的功能和类。但随着时间的推移,当新版本的包可用时,包内容将开始在 DAG 包中出现偏差。

通过这种设置,我们遇到了以下问题:当包/子文件夹的内容开始在 ZIP 中出现偏差时,Airflow 似乎不能很好地处理同名包。因为当我运行“airflow list_dags”时,它会显示如下错误:

文件“/data/share/airflow/dags/program1/program1.zip/program1.py”,第 1 行,在 > from subfolder1.functions1 import function1 ImportError: No module named 'subfolder1.functions1'

可以使用以下代码重现问题,其中两个小 DAG 与包 my_functions 一起位于其 ZIP 文件中,该包具有相同的名称,但内容不同。

DAG 包 ZIP 1:

程序1.py

from my_functions.functions1 import function1

from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator


def do_it():
    print('program1')

dag = DAG(
    'program1',
    schedule_interval=None,
    catchup=False,
    start_date=datetime(2019, 6, 23)
)

hello_operator = PythonOperator(task_id='program1_task1', python_callable=do_it, dag=dag)

my_functions/functions1.py:

def function1():
    print('function1')

DAG 包 ZIP 2:

程序2.py:

from my_functions.functions2 import function2

from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator


def do_it():
    print('program1')

dag = DAG(
    'program1',
    schedule_interval=None,
    catchup=False,
    start_date=datetime(2019, 6, 23)
)

hello_operator = PythonOperator(task_id='program2_task2', python_callable=do_it, dag=dag)

my_functions/functions2.py:

def function2():
    print('function2')

当我运行“airflow list_dags”时,使用这两个 ZIP 文件会显示错误:

文件“/data/share/airflow/dags/program1/program1.zip/program1.py”,第 1 行,从 subfolder1.functions1 导入 function1 ImportError: No module named 'subfolder1.functions1'

当 ZIP 中的子文件夹内容相同时,不会发生错误。

我的问题:如何防止 ZIP 中的子文件夹冲突?我真的很想拥有完全独立于代码的 DAG,并带有自己的软件包版本。

4

1 回答 1

1

通过在 DAG(program1.py 和 program2.py)顶部执行以下操作来解决,在

from my_functions.functions1 import function1

from my_functions.functions2 import function2

代码:

import sys

# Cleanup up the already imported function module
cleanup_mods = []
for mod in sys.modules:
    if mod.startswith("function"):
        cleanup_mods.append(mod)
for mod in cleanup_mods:
    del sys.modules[mod]

这可以确保每次解析 DAG,导入的库都被清理。

于 2019-12-17T10:52:05.757 回答