问题标签 [airflow-taskflow]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
95 浏览

airflow - 气流任务流 - 并行运行任务

想尝试新的任务流 API,我到了需要 2 个并行任务的地步。

使用 Airflow v1,我曾经做过类似的事情

我们现在调用任务的方式不同了PythonOperator

如何使用 TaskFlow 做列表?

谢谢

0 投票
1 回答
2176 浏览

python - 气流:在任务组中创建动态任务的问题

我正在尝试制作一个动态的工作流程。

我收到了这个损坏的 DAG 错误重复任务 ID

我的代码:

当我尝试在TaskGroup. 如果我删除TaskGroup它工作正常。

我在这里发现了这个问题https://github.com/apache/airflow/issues/8057。有没有办法解决这个错误?比如动态创建自定义task_id?我知道这可以使用PythonOperator. 但我正在尝试使用TaskFlow API来代替。

谢谢

0 投票
1 回答
124 浏览

bigdata - 气流动态生成的任务未按顺序运行

我创建了动态任务生成 dag。任务是准确生成的,但是这些任务不是按顺序触发的,也不是一致的。我注意到它按字母数字顺序触发。让我们检查一下 run_modification_ 任务。我已经生成了 0 到 29 个任务。我注意到它触发了以下格式。 run_modification_0 run_modification_1 run_modification_10 run_modification_11 run_modification_12 run_modification_13 run_modification_14 run_modification_15 run_modification_16 run_modification_17 run_modification_18 run_modification_19 run_modification_2 run_modification_21 run_modification_23....

但我需要按任务顺序运行它,例如
run_modification_0 run_modification_1 run_modification_2 run_modification_3 run_modification_4 run_modification_5..

请帮助我按任务创建顺序运行这些任务。

请查看相关的 dag 运行图像

0 投票
1 回答
261 浏览

airflow - 气流:如何将数据从装饰任务传递到 SimpleHttpOperator?

我最近开始使用 Apache 气流。我正在使用带有idGet_payloadSimpleHttpOperator. 任务Get_payload从数据库中获取数据,进行一些数据操作并返回一个dict作为有效负载。

问题

无法将数据从上一个任务传递到下一个任务。是的,我知道,XComs但使用 Taskflow API 的全部目的是避免与XComs. get_data直接传递给data.SimpleHttpOperator

到目前为止我尝试了什么?

正如这个 SO answer中提到的,我template_field在我的自定义传感器中使用来定义期望来自上一个任务的数据的字段。如果是SimpleHttpOperator操作员,我无法对其进行编辑以执行相同操作。那么如何在 中类似地解决它SimpleHttpOperator

我已经检查了这个 SO 答案这个

DAG:

完整日志:

0 投票
1 回答
367 浏览

airflow - 气流:在单个 DAG 文件中导入装饰任务与所有任务?

我最近开始使用 Apache Airflow 及其新概念 Taskflow API 之一。我有一个包含多个装饰任务的 DAG,其中每个任务都有 50 多行代码。所以我决定将每个任务移到一个单独的文件中。

在引用 stackoverflow 之后,我可以以某种方式将 DAG 中的任务移动到每个任务的单独文件中。现在,我的问题是:

  1. 下面显示的两个代码示例是否相同?(我担心任务的范围)。
  2. 他们将如何以黑白方式共享数据?
  3. 性能上有区别吗?(我读到由于性能问题不鼓励 Subdags,尽管这与 Subdags 无关)。

我在网络(和官方文档)中看到的所有代码示例都将所有任务放在一个文件中。

样品 1

示例 2文件夹结构:

文件任务_A.py

文件任务_B.py

文件Main_Dag.py

提前致谢!

0 投票
1 回答
96 浏览

python - 如何使用任务流 api 为 on_failure 指定函数

当 DAG 使用任务流 api 失败时,如何指定要运行的函数?使用旧样式,我可以指定一个函数来运行 on_failure,但我无法弄清楚或找到文档来使用带有 DAG 和任务运算符的任务流 api 来完成它。

0 投票
1 回答
59 浏览

airflow - 如何使用 TaskFlow Api 在两个任务之间创建共享子项?

我的代码如下所示:

dag 看起来像这样: 在此处输入图像描述

什么时候应该是这样的: 在此处输入图像描述

目标是last_task依赖taskataskb除那个taskataskb那个下载Data3请求。我无法使用它来实现它TaskFlow API

0 投票
1 回答
49 浏览

airflow - 使用taskflow api时命名Airflow dags而不是python可调用

我尝试使用任务流 API 创建多个 dag,其中传递了一个变量,dag 中的任务可以使用该变量

例如,我正在尝试使用此代码

理想情况下,这将创建两个 ID 为 dag_1 和 dag_2 的 dag。一个 dag 将打印字符串“string1”,其他 6 个。这几乎适用于创建 1 个 dag 且 ID 为 dag_template 打印 6 的代码。

文档中有 dag 将被称为 python 可调用,是否可以覆盖它。

0 投票
0 回答
14 浏览

airflow-2.x - 如何在 AirFlow 2.0 中动态创建 TaskFlow DAG?

我有一个参数化的 DAG,我想基于此 DAG 以编程方式创建 DAG 实例。

在传统的气流模型中,我可以使用循环轻松实现这一点:

如何使用 TaskFlow 模型在 AirFlow 2.0 中实现类似的行为?

我试图像下面的代码一样手动扩展@dag 装饰,但它不起作用。动态创建的 DAG 中没有任何任务: