我有一个气流 DAG“example_ml.py”,它有一个任务“train_ml_model”,这个任务正在调用/运行一个 python 脚本“training.py”。
-Dags/example_ml.py -Dags/training.py
DAG 任务的代码片段:
train_model = PythonOperator(
task_id='train_model',
python_callable=training,
dag = dag
)
PS:我使用的是k8s集群。Airflow在k8s集群中运行,executor设置为kubernetesExecutor。因此,当每个 DAG 被触发时,都会分配一个新的 pod 来完成任务。