我正在尝试使用 Airflow 来执行一个简单的任务 python。
from __future__ import print_function
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta
from pprint import pprint
seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
datetime.min.time())
args = {
'owner': 'airflow',
'start_date': seven_days_ago,
}
dag = DAG(dag_id='python_test', default_args=args)
def print_context(ds, **kwargs):
pprint(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
run_this = PythonOperator(
task_id='print',
provide_context=True,
python_callable=print_context,
dag=dag)
如果我尝试,例如:
气流测试 python_test 打印 2015-01-01
有用!
现在我想把我的def print_context(ds, **kwargs)
函数放在其他 python 文件中。所以我创建了一个名为:simple_test.py 的文件并更改:
run_this = PythonOperator(
task_id='print',
provide_context=True,
python_callable=simple_test.print_context,
dag=dag)
现在我尝试再次运行:
气流测试 python_test 打印 2015-01-01
好!它仍然有效!
但是如果我创建一个模块,例如,带有文件的工作模块SimplePython.py
,导入(from worker import SimplePython
)它并尝试:
气流测试 python_test 打印 2015-01-01
它给出了信息:
ImportError:没有名为worker的模块
问题:
- 是否可以在 DAG 定义中导入模块?
- Airflow+Celery 将如何在工作节点之间分发所有必要的 python 源文件?