21

关于“动态任务”的其他问题似乎涉及在计划或设计时动态构建 DAG。我有兴趣在执行期间将任务动态添加到 DAG。

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

dag = DAG('test_dag', description='a test',
          schedule_interval='0 0 * * *',
          start_date=datetime(2018, 1, 1),
          catchup=False)

def make_tasks():
    du1 = DummyOperator(task_id='dummy1', dag=dag)
    du2 = DummyOperator(task_id='dummy2', dag=dag)
    du3 = DummyOperator(task_id='dummy3', dag=dag)
    du1 >> du2 >> du3

p = PythonOperator(
    task_id='python_operator',
    dag=dag,
    python_callable=make_tasks)

这种幼稚的实现似乎不起作用 - 虚拟任务永远不会出现在 UI 中。

在执行期间向 DAG 添加新运算符的正确方法是什么?可能吗?

4

3 回答 3

8

在执行期间无法修改 DAG(无需更多工作)。

调度程序在循环中dag = DAG(...拾取。它将有任务实例'python_operator'。该任务实例在 dag 运行中安排,并由工作人员或执行程序执行。由于 Airflow DB 中的 DAG 模型仅由调度程序更新,因此这些添加的虚拟任务不会被持久化到 DAG,也不会被调度运行。当工人退出时,它们将被遗忘。除非您从调度程序中复制有关持久化和更新模型的所有代码……但这将在调度程序下次访问 DAG 文件进行解析时撤消,这可能每分钟发生一次,每秒一次或更快,具体取决于其他多少DAG 文件要解析。

Airflow 实际上希望每个 DAG 在运行之间大致保持相同的布局。它还想不断地重新加载/解析 DAG 文件。因此,尽管您可以制作一个 DAG 文件,在每次运行时根据一些外部数据动态确定任务(最好缓存在文件或 pyc 模块中,而不是像数据库查找那样的网络 I/O,但您会减慢整个调度循环对于所有DAG)这不是一个好计划,因为您的图表和树视图会变得混乱,并且您的调度程序解析将因查找而变得更加繁重。

你可以让 callable 运行每个任务……</p>

def make_tasks(context):
    du1 = DummyOperator(task_id='dummy1', dag=dag)
    du2 = DummyOperator(task_id='dummy2', dag=dag)
    du3 = DummyOperator(task_id='dummy3', dag=dag)
    du1.execute(context)
    du2.execute(context)
    du3.execute(context)

p = PythonOperator(
    provides_context=true,

但这是顺序的,你必须弄清楚如何使用 python 使它们并行(使用期货?),如果有任何引发异常,整个任务都会失败。此外,它绑定到一个执行者或工作人员,因此不使用气流的任务分配(kubernetes、mesos、celery)。

另一种解决方法是添加固定数量的任务(最大数量),并使用可调用对象来短路不需要的任务或使用 xcom 为每个任务推送参数,从而更改它们在运行时的行为但不改变 DAG。

于 2019-02-08T05:34:19.243 回答
2

关于您的代码示例,您永远不会调用在 DAG 中注册任务的函数。

要拥有一种动态任务,您可以使用一个运算符来根据某些状态执行不同的操作,或者您可以使用 ShortCircuitOperator 使用一些可以根据状态跳过的运算符。

于 2018-02-05T15:53:54.480 回答
1

我感谢大家在这里所做的所有工作,因为我面临着创建动态结构化 DAG 的相同挑战。我已经犯了足够多的错误,以至于没有使用软件来违背其设计。如果我不能在 UI 上检查整个运行并放大和缩小,基本上使用气流功能,这也是我使用它的主要原因。我可以在一个函数中编写多处理代码并完成它。

话虽这么说,我的解决方案是使用资源管理器,例如 redis 锁定,并有一个 DAG 向该资源管理器写入有关运行内容的数据如何运行等;并让另一个或多个以特定间隔运行的 DAG 轮询资源管理器,在运行之前锁定它们并在完成时将它们删除。这样至少我可以按预期使用气流,即使它的规格不能完全满足我的需求。我将问题分解为更可定义的块。这些解决方案是有创意的,但它们违背了设计并且未经开发人员测试。特别是说有固定的结构化工作流程。除非我重写核心气流代码并进行自我测试,否则我无法解决未经测试和违反设计的代码。

于 2019-09-21T22:55:51.437 回答