我们正在建立一个 Airflow 框架,多个数据科学家团队可以在其中协调他们的数据处理管道。我们开发了一个 Python 代码库来帮助他们实现 DAG,其中包括各种包和模块中的函数和类(也包括 Operator 子类)。
每个团队都会将自己的 DAG 与包中的函数和类一起打包在一个 ZIP 文件中。例如,第一个 ZIP 文件将包含
邮编1:
main_dag_teamA.py
子文件夹1:package1-with-generic-functions + init .py
子文件夹2:package2-with-generic-operators + init .py
另一个 ZIP 文件将包含
邮编2:
main_dag_teamB.py
子文件夹1:package1-with-generic-functions + init .py
子文件夹2:package2-with-generic-operators + init .py
请注意,在两个 ZIP 文件中,子文件夹 1 和子文件夹 2 通常完全相同,这意味着完全相同的文件具有相同的功能和类。但随着时间的推移,当新版本的包可用时,包内容将开始在 DAG 包中出现偏差。
通过这种设置,我们遇到了以下问题:当包/子文件夹的内容开始在 ZIP 中出现偏差时,Airflow 似乎不能很好地处理同名包。因为当我运行“airflow list_dags”时,它会显示如下错误:
文件“/data/share/airflow/dags/program1/program1.zip/program1.py”,第 1 行,在 > from subfolder1.functions1 import function1 ImportError: No module named 'subfolder1.functions1'
可以使用以下代码重现问题,其中两个小 DAG 与包 my_functions 一起位于其 ZIP 文件中,该包具有相同的名称,但内容不同。
DAG 包 ZIP 1:
程序1.py
from my_functions.functions1 import function1
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
def do_it():
print('program1')
dag = DAG(
'program1',
schedule_interval=None,
catchup=False,
start_date=datetime(2019, 6, 23)
)
hello_operator = PythonOperator(task_id='program1_task1', python_callable=do_it, dag=dag)
my_functions/functions1.py:
def function1():
print('function1')
DAG 包 ZIP 2:
程序2.py:
from my_functions.functions2 import function2
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
def do_it():
print('program1')
dag = DAG(
'program1',
schedule_interval=None,
catchup=False,
start_date=datetime(2019, 6, 23)
)
hello_operator = PythonOperator(task_id='program2_task2', python_callable=do_it, dag=dag)
my_functions/functions2.py:
def function2():
print('function2')
当我运行“airflow list_dags”时,使用这两个 ZIP 文件会显示错误:
文件“/data/share/airflow/dags/program1/program1.zip/program1.py”,第 1 行,从 subfolder1.functions1 导入 function1 ImportError: No module named 'subfolder1.functions1'
当 ZIP 中的子文件夹内容相同时,不会发生错误。
我的问题:如何防止 ZIP 中的子文件夹冲突?我真的很想拥有完全独立于代码的 DAG,并带有自己的软件包版本。