13

从气流文档中:

SubDAGs must have a schedule and be enabled. If the SubDAG’s schedule is set to None or @once, the SubDAG will succeed without having done anything

我了解 subdagooperator 实际上是作为 BackfillJob 实现的,因此我们必须向schedule_interval操作员提供一个。但是,有没有办法获得schedule_interval="@once"subdag 的语义等价物?我担心如果我对 subdag 使用 set ,如果schedule_interval="@daily"subdag 的运行时间超过一天,那么 subdag 可能会运行不止一次。

def subdag_factory(parent_dag_name, child_dag_name, args):
    subdag = DAG(
        dag_id="{parent_dag_name}.{child_dag_name}".format(
            parent_dag_name=parent_dag_name, child_dag_name=child_dag_name
        ),
        schedule_interval="@daily", # <--- this bit here
        default_args=args
    )

    ... do more stuff to the subdag here
    return subdag

TLDR:如何伪造“每次触发父 dag 时只运行一次这个 subdag”

4

2 回答 2

6

我发现这 schedule=@once对我的 subdags 很好。也许我的版本已经过时了,但是即使所有任务都成功(或被跳过),我的 subdags失败的问题也比相反的要多。

实际运行的示例代码现在在我的机器上运行得非常愉快:

subdag_name = ".".join((parent_name,child_name))
logging.info(parent_name)
logging.info(subdag_name)
dag_subdag = DAG(
    dag_id=subdag_name,
    default_args=dargs,
    schedule_interval="@once",
)

事实上,我最初将几乎所有的 dag 构建为我的 subdag 的美化 cfg 文件。不确定经过反复试验后的想法有多好,但计划间隔对我来说从来都不是障碍。

我正在运行一个相对较新的 1.8 版本,几乎没有自定义。我一直在遵循示例 dag 建议,将我的 subdags 保存在 dags 文件夹内的文件夹中,这样它们就不会出现在 DagBag 中。

于 2017-04-22T03:08:07.607 回答
2

为 subdag尝试使用schedule=None的外部触发模式。在这种情况下,它只会在被父 dag 触发时运行

于 2017-04-20T05:48:09.313 回答