问题标签 [apache-airflow]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
1005 浏览

airflow - 气流不会更新要完成的 dag/task 的进度,即使 dag/task 已实际完成

我已将气流设置为在具有 10 个工作节点的分布式模式下运行。我试图通过触发一个仅包含 1 个任务的测试 dag 来访问并行工作负载的性能,该任务仅休眠 3 秒然后出来。

我使用命令气流回填 test_dag -s 2015-06-20 -e 2015-07-10 触发了 dag

调度程序并行启动作业/dag,我经常看到以下 o/p:[2017-06-27 09:52:29,611] {models.py:4024} INFO - 更新状态以考虑 1 个任务[2017-06-27 09:52:29,647] {models.py:4024} INFO - 更新状态以考虑 1 个任务 [2017-06-27 09:52:29,664] {jobs.py:1983} INFO - [回填进度] | 已完成第 19 次(共 21 次)| 任务等待:0 | 成功:19 | 开球:2 | 失败:0 | 跳过:0 | 死锁:0 | 未准备好:0

此处 kicked_off:2 表示启动了 2 个任务,但是当我看到 dag 运行状态的 UI 时,我看到 2 个 dag 实例正在运行。当我查看相应的任务实例日志时,它表明任务已成功完成,但上述消息仍无限显示在命令提示符中

[2017-06-27 09:52:29,611] {models.py:4024} INFO - 更新状态以考虑 1 个任务 [2017-06-27 09:52:29,647] {models.py:4024} INFO - 更新状态以考虑 1 个任务 [2017-06-27 09:52:29,664] {jobs.py:1983} 信息 - [回填进度] | 已完成第 19 次(共 21 次)| 任务等待:0 | 成功:19 | 开球:2 | 失败:0 | 跳过:0 | 死锁:0 | 未准备好:0

是不是工人正在发送的消息被丢弃,因此状态没有得到更新?

airflow.cfg 文件中是否有任何参数允许在其他工作节点上重试此类失败的作业,而不是无限等待负责执行 aobe 失败任务的工作节点的消息。

0 投票
4 回答
10996 浏览

airflow - 即使从配置中删除,Airflow 仍将继续显示示例 dag

load_examples = False即使在配置文件中关闭后,气流示例 dag 仍保留在 UI中。

在此处输入图像描述

系统通知 dag 不存在于 dag 文件夹中,但它们仍保留在 UI 中,因为调度程序已在元数据数据库中将其标记为活动。

我知道从那里删除它们的一种方法是直接删除数据库中的这些行,但当然这并不理想。我应该如何继续从 UI 中删除这些 dag?

0 投票
1 回答
495 浏览

python - 气流回填 dag 运行依赖

当我为特定日期回填 dag 时,我想按顺序运行它,即我希望它每天运行

完成特定日期和第二天的所有任务,依此类推。我使用了 depends_on_past 参数,但它只是帮助我设置对不在 dag 运行中的任务的依赖。

示例:- Dag_A 有 4 个任务,我使用带有depends_on_past 的回填,在 Dag_A(第一天)执行第一个任务后,它触发 Dag_A(第二天)的第一个任务,我不想要它

0 投票
1 回答
163 浏览

airflow - 将气流从 1.8.0 升级到 1.8.1 时暂停的 dag 重新启动?

最近我将气流从 1.8.0 升级到 1.8.1。升级很顺利,但是一旦我重新启动 Web 服务器和调度程序,所有暂停的 dag 都会自动重新启动,并从停止之日起开始运行多次。它弄乱了大部分用户数据,我们需要手动清理。我们如何在未来的升级中防止这种情况发生?

0 投票
1 回答
3471 浏览

airflow - 在相同的 Airflow 版本上运行 Airflow 升级命令是否安全?

Airflow 有一个 upgradeb 命令,在升级 Airflow 版本时需要运行该命令。我想知道即使版本相同也可以安全运行

0 投票
4 回答
18188 浏览

airflow - 使自定义 Airflow 宏扩展其他宏

有没有办法在 Airflow 中创建一个用户定义的宏,它本身是从其他宏计算的?

next_execution_date这里的用例是将新的 Airflow v1.8宏反向移植到 Airflow v1.7 中。不幸的是,这个模板是在没有宏扩展的情况下呈现的:

0 投票
4 回答
31510 浏览

python - 如何向 Airflow 添加新的 DAG?

我在一个名为(实际上是教程中提供tutorial_2.py的副本,除了更改为)的文件中定义了一个 DAG。tutorial.pyairflowdag_idtutorial_2

当我查看我的默认未修改airflow.cfg(位于 中~/airflow)时,我看到它dags_folder设置为/home/alex/airflow/dags.

我愿意cd /home/alex/airflow; mkdir dags; cd dags; cp [...]/tutorial_2.py tutorial_2.py。现在我有一个dags与 中设置的路径匹配的文件夹airflow.cfg,其中包含tutorial_2.py我之前创建的文件。

但是,当我运行时airflow list_dags,我只获得与默认教程 DAG 对应的名称。

tutorial_2想出现在我的 DAG 列表中,这样我就可以开始与之交互了。既没有python tutorial_2.pyairflow resetdb没有使它出现在列表中。

我该如何补救?

0 投票
1 回答
450 浏览

python - 气流任务参数从上一个任务返回

如何将函数参数设置为从先前运行的任务/函数返回的任务。请注意,这些任务是以编程方式定义的,因此我不能简单地使用xcom_pull(task_id="some_task"),因为这些任务是在循环中定义的(如下所示):

0 投票
0 回答
1081 浏览

airflow - 气流“one_success”任务未触发

我正在使用 LocalExecutor 在 4 CPU 机器上运行 Airflow

我已经将上游任务定义为一次成功

...

但即使有些被明确标记为成功,任务也不会触发

“下载任务”确实并行运行,所以这不是问题

图形

检查任务显示:

依赖性:未知

原因:满足所有依赖关系但任务实例未运行。在大多数情况下,这只是意味着任务可能很快就会被安排,除非: - 调度程序已关闭或负载过重 - 此任务实例已经运行并且手动更改了它的状态(例如在 UI 中清除)

我查看了负载,它确实很高:

平均负载:2.45、3.55、3.71 CPU 在 50-60%

但是其他的任务都已经完成了,应该有空闲的资源可以开始下一个任务吧?

0 投票
3 回答
12549 浏览

python - 如何设置供气流使用的环境变量?

Airflow 在尝试运行 DAG 时返回错误,说它找不到环境变量,这很奇怪,因为它能够找到我存储为 Python 变量的 3 个其他环境变量。这些变量根本没有问题。

我有所有 4 个变量~/.profile并且也完成了

在什么用户下airflow运行?我也完成了这些export命令,所以我认为它们会在运行 dag 时sudo被拾取airflow