问题标签 [apache-airflow]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
airflow - 气流不会更新要完成的 dag/task 的进度,即使 dag/task 已实际完成
我已将气流设置为在具有 10 个工作节点的分布式模式下运行。我试图通过触发一个仅包含 1 个任务的测试 dag 来访问并行工作负载的性能,该任务仅休眠 3 秒然后出来。
我使用命令气流回填 test_dag -s 2015-06-20 -e 2015-07-10 触发了 dag
调度程序并行启动作业/dag,我经常看到以下 o/p:[2017-06-27 09:52:29,611] {models.py:4024} INFO - 更新状态以考虑 1 个任务[2017-06-27 09:52:29,647] {models.py:4024} INFO - 更新状态以考虑 1 个任务 [2017-06-27 09:52:29,664] {jobs.py:1983} INFO - [回填进度] | 已完成第 19 次(共 21 次)| 任务等待:0 | 成功:19 | 开球:2 | 失败:0 | 跳过:0 | 死锁:0 | 未准备好:0
此处 kicked_off:2 表示启动了 2 个任务,但是当我看到 dag 运行状态的 UI 时,我看到 2 个 dag 实例正在运行。当我查看相应的任务实例日志时,它表明任务已成功完成,但上述消息仍无限显示在命令提示符中
[2017-06-27 09:52:29,611] {models.py:4024} INFO - 更新状态以考虑 1 个任务 [2017-06-27 09:52:29,647] {models.py:4024} INFO - 更新状态以考虑 1 个任务 [2017-06-27 09:52:29,664] {jobs.py:1983} 信息 - [回填进度] | 已完成第 19 次(共 21 次)| 任务等待:0 | 成功:19 | 开球:2 | 失败:0 | 跳过:0 | 死锁:0 | 未准备好:0
是不是工人正在发送的消息被丢弃,因此状态没有得到更新?
airflow.cfg 文件中是否有任何参数允许在其他工作节点上重试此类失败的作业,而不是无限等待负责执行 aobe 失败任务的工作节点的消息。
python - 气流回填 dag 运行依赖
当我为特定日期回填 dag 时,我想按顺序运行它,即我希望它每天运行
完成特定日期和第二天的所有任务,依此类推。我使用了 depends_on_past 参数,但它只是帮助我设置对不在 dag 运行中的任务的依赖。
示例:- Dag_A 有 4 个任务,我使用带有depends_on_past 的回填,在 Dag_A(第一天)执行第一个任务后,它触发 Dag_A(第二天)的第一个任务,我不想要它
airflow - 将气流从 1.8.0 升级到 1.8.1 时暂停的 dag 重新启动?
最近我将气流从 1.8.0 升级到 1.8.1。升级很顺利,但是一旦我重新启动 Web 服务器和调度程序,所有暂停的 dag 都会自动重新启动,并从停止之日起开始运行多次。它弄乱了大部分用户数据,我们需要手动清理。我们如何在未来的升级中防止这种情况发生?
airflow - 在相同的 Airflow 版本上运行 Airflow 升级命令是否安全?
Airflow 有一个 upgradeb 命令,在升级 Airflow 版本时需要运行该命令。我想知道即使版本相同也可以安全运行
airflow - 使自定义 Airflow 宏扩展其他宏
有没有办法在 Airflow 中创建一个用户定义的宏,它本身是从其他宏计算的?
next_execution_date
这里的用例是将新的 Airflow v1.8宏反向移植到 Airflow v1.7 中。不幸的是,这个模板是在没有宏扩展的情况下呈现的:
python - 如何向 Airflow 添加新的 DAG?
我在一个名为(实际上是教程中提供tutorial_2.py
的副本,除了更改为)的文件中定义了一个 DAG。tutorial.py
airflow
dag_id
tutorial_2
当我查看我的默认未修改airflow.cfg
(位于 中~/airflow
)时,我看到它dags_folder
设置为/home/alex/airflow/dags
.
我愿意cd /home/alex/airflow; mkdir dags; cd dags; cp [...]/tutorial_2.py tutorial_2.py
。现在我有一个dags
与 中设置的路径匹配的文件夹airflow.cfg
,其中包含tutorial_2.py
我之前创建的文件。
但是,当我运行时airflow list_dags
,我只获得与默认教程 DAG 对应的名称。
我tutorial_2
想出现在我的 DAG 列表中,这样我就可以开始与之交互了。既没有python tutorial_2.py
也airflow resetdb
没有使它出现在列表中。
我该如何补救?
python - 气流任务参数从上一个任务返回
如何将函数参数设置为从先前运行的任务/函数返回的任务。请注意,这些任务是以编程方式定义的,因此我不能简单地使用xcom_pull(task_id="some_task")
,因为这些任务是在循环中定义的(如下所示):
airflow - 气流“one_success”任务未触发
我正在使用 LocalExecutor 在 4 CPU 机器上运行 Airflow
我已经将上游任务定义为一次成功
...
但即使有些被明确标记为成功,任务也不会触发
“下载任务”确实并行运行,所以这不是问题
检查任务显示:
依赖性:未知
原因:满足所有依赖关系但任务实例未运行。在大多数情况下,这只是意味着任务可能很快就会被安排,除非: - 调度程序已关闭或负载过重 - 此任务实例已经运行并且手动更改了它的状态(例如在 UI 中清除)
我查看了负载,它确实很高:
平均负载:2.45、3.55、3.71 CPU 在 50-60%
但是其他的任务都已经完成了,应该有空闲的资源可以开始下一个任务吧?
python - 如何设置供气流使用的环境变量?
Airflow 在尝试运行 DAG 时返回错误,说它找不到环境变量,这很奇怪,因为它能够找到我存储为 Python 变量的 3 个其他环境变量。这些变量根本没有问题。
我有所有 4 个变量~/.profile
并且也完成了
在什么用户下airflow
运行?我也完成了这些export
命令,所以我认为它们会在运行 dag 时sudo
被拾取airflow