问题标签 [airflow]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
2659 浏览

python - 气流数据库会话不提供任何环境变量

作为一个 Airflow 和 Python 新手,我什至不知道我问的问题是否正确,但无论如何都要问。我在 CentOS 系统上配置了气流。使用远程 MySql 实例作为后端。在我的代码中,需要获取一些变量,代码如下所示:

我收到以下错误:

回溯(最后一次调用):文件“test2.py”,第 5 行,在 local_env['SHELL'] = Variable.get('SHELL') 文件“/com/work/airflowenv/lib/python2.7/site -packages/airflow/utils/db.py”,第 53 行,在包装结果 = func(*args, **kwargs) 文件“/com/work/airflowenv/lib/python2.7/site-packages/airflow/models .py",第 3134 行,在 get raise ValueError('Variable {} 不存在'.format(key)) ValueError: 变量 SHELL 不存在

在models.py的这段代码中,正是Variable.get()方法抛出了异常:

其中 session.query 已经产生无。不太明白这里是如何注入会话的。以及为什么没有设置这些会话变量。我们应该在远程 MySQL 实例上设置一些东西吗?

顺便说一句,我们在具有本地 mysql 实例的另一台机器上有另一个相同的气流实例。并且单独运行我提供的脚本没有问题:

[2016-09-27 01:54:48,341] { init .py:36} 信息 - 使用执行器

本地执行器

/bin/bash /bin/bash

设置气流时我错过了什么?谢谢,

0 投票
2 回答
9089 浏览

python - 使用 Airflow UI 执行整个 DAG

我是气流的新手,我们有一个有 3 个任务的 DAG。目前我们正在使用 Celery Executor,因为我们需要灵活地运行单个任务。我们不想安排工作流程,现在它将是手动触发。有什么方法可以使用 Airflow UI 执行整个工作流程(与我们在 oozie 中的相同)?

一次执行一项任务很痛苦。

0 投票
1 回答
155 浏览

python - Airflow 无法从 UI 一次性执行所有相关任务

我的 DAG 有 3 个任务,我们正在使用 Celery 执行器,因为我们必须从 UI 触发单个任务。我们能够从 UI 执行单个任务。

我们目前面临的问题是,尽管我们已经设置了任务依赖项,但我们无法一次性从 UI 执行 DAG 的所有步骤。

我们可以从命令行执行完整的 DAG,但有没有办法通过 UI 执行相同的操作?

0 投票
3 回答
8823 浏览

airflow - 气流:1.7:DAG 未在 GUI 中更新

test1.py在目录下定义了我的 dag 文件/opt/airflow/dags。最初,该文件只有一个任务,test1_task01. 在我的 GUI 中,dag 只有一项任务才能正确显示。但是,当我向我的 DAG 添加另一个任务时,如下所示:

我在我的 GUI 中没有看到更新的 DAG。GUI 仍然有旧的 DAG。airflow list_tasks <dagname>显示正确的 DAG 关系并airflow scheduler执行最新版本,但 GUI 不知何故并未提取最新信息。

如何通过 GUI 获得 DAG 的更新视图?

0 投票
3 回答
39392 浏览

java - 如何在 Airflow 中运行 Spark 代码?

地球人你好!我正在使用 Airflow 来安排和运行 Spark 任务。这次我发现的只是 Airflow 可以管理的 python DAG。
DAG 示例:

问题是我不擅长 Python 代码并且有一些用 Java 编写的任务。我的问题是如何在 python DAG 中运行 Spark Java jar?或者也许还有其他方法可以做到这一点?我发现火花提交:http
://spark.apache.org/docs/latest/submitting-applications.html但我不知道如何将所有东西连接在一起。也许有人以前使用过它并且有工作示例。感谢您的时间!

0 投票
1 回答
7618 浏览

python - BaseOperator.xcom_pull 中的上下文参数是什么

我正在阅读 API Docs,但不清楚BaseOperator.xcom_pull的上下文参数是什么。

我以为会是dag.default_args,但我收到了KeyError: 'ti'

我在此处的 push() 示例之后的先前任务中执行了 xcom_push 。

0 投票
4 回答
2055 浏览

airflow - 如何在 Airflow 中动态创建的任务之间创建依赖关系

我正在使用以下代码创建动态任务。我想创建对这些动态创建的任务的依赖。例如,runStep_0 应该依赖于 runStep_1 等。

0 投票
1 回答
1167 浏览

python - 气流不会触发带有“LocalExecutor”的并发 DAG

我正在使用气流 1.7.1.3。

我对并发 DAG / 任务有疑问。当一个 DAG 运行时,调度程序不再启动其他 DAG。调度程序似乎完全冻结(不再有日志)......直到正在运行的 DAG 完成。然后,新的 DAGrun 被触发。我的不同任务是长时间运行的 ECS 任务(~10 分钟)

我使用LocalExecutor并且我让默认配置大约parallelism=32dag_concurrency=16. 我自动使用airflow scheduler -n 20并重新启动它,并为我'depends_on_past': False的所有 DAG 声明进行设置。

有关信息,我在 ECS 集群中运行的容器中部署了气流。max_threads = 2我只有 2 个 CPU 可用。

有任何想法吗 ?谢谢

0 投票
3 回答
39584 浏览

airflow - 气流回填澄清

我刚刚开始使用 Airbnb 的气流,我仍然不清楚如何/何时完成回填。

具体来说,有两个用例让我感到困惑:

  1. 如果我运行airflow scheduler几分钟,停止一分钟,然后再次重新启动,我的 DAG 似乎在前 30 秒左右运行了额外的任务,然后它继续正常运行(每 10 秒运行一次)。这些额外的任务是否“回填”了在早期运行中无法完成的任务?如果是这样,我将如何告诉气流不要回填这些任务?

  2. 如果我运行airflow scheduler几分钟,然后运行airflow clear MY_tutorial,然后重新启动airflow scheduler,它似乎运行了大量的额外任务。这些任务是否也以某种方式“回填”任务?或者我错过了什么。

目前,我有一个非常简单的 dag:

我在气流配置中更改的唯一两件事是

  1. 我从使用 sqlite db 更改为使用 postgres db
  2. 我正在使用 aCeleryExecutor而不是 aSequentialExecutor

非常感谢你的帮助!

0 投票
1 回答
624 浏览

airflow - 如何在 SlackAPIPostOperator 中获取“ds”?

我想运行一个需要 datetime 参数的 python 脚本并将其输出发布到 slack。但我不知道如何获取气流模板变量ds

假设,我有以下代码:

因为我将运行airflow backfill dag_id -s 2016-10-01,所以ds(这里是 2016-10-01)应该传递给松弛文本。

我尝试将 python 脚本输出写入文件,然后读取它并直接传递给松弛文本。但我不认为这是一个完美的解决方案。