问题标签 [airflow]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python - 气流数据库会话不提供任何环境变量
作为一个 Airflow 和 Python 新手,我什至不知道我问的问题是否正确,但无论如何都要问。我在 CentOS 系统上配置了气流。使用远程 MySql 实例作为后端。在我的代码中,需要获取一些变量,代码如下所示:
我收到以下错误:
回溯(最后一次调用):文件“test2.py”,第 5 行,在 local_env['SHELL'] = Variable.get('SHELL') 文件“/com/work/airflowenv/lib/python2.7/site -packages/airflow/utils/db.py”,第 53 行,在包装结果 = func(*args, **kwargs) 文件“/com/work/airflowenv/lib/python2.7/site-packages/airflow/models .py",第 3134 行,在 get raise ValueError('Variable {} 不存在'.format(key)) ValueError: 变量 SHELL 不存在
在models.py的这段代码中,正是Variable.get()方法抛出了异常:
其中 session.query 已经产生无。不太明白这里是如何注入会话的。以及为什么没有设置这些会话变量。我们应该在远程 MySQL 实例上设置一些东西吗?
顺便说一句,我们在具有本地 mysql 实例的另一台机器上有另一个相同的气流实例。并且单独运行我提供的脚本没有问题:
[2016-09-27 01:54:48,341] { init .py:36} 信息 - 使用执行器
本地执行器
/bin/bash /bin/bash
设置气流时我错过了什么?谢谢,
python - 使用 Airflow UI 执行整个 DAG
我是气流的新手,我们有一个有 3 个任务的 DAG。目前我们正在使用 Celery Executor,因为我们需要灵活地运行单个任务。我们不想安排工作流程,现在它将是手动触发。有什么方法可以使用 Airflow UI 执行整个工作流程(与我们在 oozie 中的相同)?
一次执行一项任务很痛苦。
python - Airflow 无法从 UI 一次性执行所有相关任务
我的 DAG 有 3 个任务,我们正在使用 Celery 执行器,因为我们必须从 UI 触发单个任务。我们能够从 UI 执行单个任务。
我们目前面临的问题是,尽管我们已经设置了任务依赖项,但我们无法一次性从 UI 执行 DAG 的所有步骤。
我们可以从命令行执行完整的 DAG,但有没有办法通过 UI 执行相同的操作?
airflow - 气流:1.7:DAG 未在 GUI 中更新
我test1.py
在目录下定义了我的 dag 文件/opt/airflow/dags
。最初,该文件只有一个任务,test1_task01
. 在我的 GUI 中,dag 只有一项任务才能正确显示。但是,当我向我的 DAG 添加另一个任务时,如下所示:
我在我的 GUI 中没有看到更新的 DAG。GUI 仍然有旧的 DAG。airflow list_tasks <dagname>
显示正确的 DAG 关系并airflow scheduler
执行最新版本,但 GUI 不知何故并未提取最新信息。
如何通过 GUI 获得 DAG 的更新视图?
java - 如何在 Airflow 中运行 Spark 代码?
地球人你好!我正在使用 Airflow 来安排和运行 Spark 任务。这次我发现的只是 Airflow 可以管理的 python DAG。
DAG 示例:
问题是我不擅长 Python 代码并且有一些用 Java 编写的任务。我的问题是如何在 python DAG 中运行 Spark Java jar?或者也许还有其他方法可以做到这一点?我发现火花提交:http
://spark.apache.org/docs/latest/submitting-applications.html但我不知道如何将所有东西连接在一起。也许有人以前使用过它并且有工作示例。感谢您的时间!
python - BaseOperator.xcom_pull 中的上下文参数是什么
我正在阅读 API Docs,但不清楚BaseOperator.xcom_pull的上下文参数是什么。
我以为会是dag.default_args,但我收到了KeyError: 'ti'
我在此处的 push() 示例之后的先前任务中执行了 xcom_push 。
airflow - 如何在 Airflow 中动态创建的任务之间创建依赖关系
我正在使用以下代码创建动态任务。我想创建对这些动态创建的任务的依赖。例如,runStep_0 应该依赖于 runStep_1 等。
python - 气流不会触发带有“LocalExecutor”的并发 DAG
我正在使用气流 1.7.1.3。
我对并发 DAG / 任务有疑问。当一个 DAG 运行时,调度程序不再启动其他 DAG。调度程序似乎完全冻结(不再有日志)......直到正在运行的 DAG 完成。然后,新的 DAGrun 被触发。我的不同任务是长时间运行的 ECS 任务(~10 分钟)
我使用LocalExecutor
并且我让默认配置大约parallelism=32
和dag_concurrency=16
. 我自动使用airflow scheduler -n 20
并重新启动它,并为我'depends_on_past': False
的所有 DAG 声明进行设置。
有关信息,我在 ECS 集群中运行的容器中部署了气流。max_threads = 2
我只有 2 个 CPU 可用。
有任何想法吗 ?谢谢
airflow - 气流回填澄清
我刚刚开始使用 Airbnb 的气流,我仍然不清楚如何/何时完成回填。
具体来说,有两个用例让我感到困惑:
如果我运行
airflow scheduler
几分钟,停止一分钟,然后再次重新启动,我的 DAG 似乎在前 30 秒左右运行了额外的任务,然后它继续正常运行(每 10 秒运行一次)。这些额外的任务是否“回填”了在早期运行中无法完成的任务?如果是这样,我将如何告诉气流不要回填这些任务?如果我运行
airflow scheduler
几分钟,然后运行airflow clear MY_tutorial
,然后重新启动airflow scheduler
,它似乎运行了大量的额外任务。这些任务是否也以某种方式“回填”任务?或者我错过了什么。
目前,我有一个非常简单的 dag:
我在气流配置中更改的唯一两件事是
- 我从使用 sqlite db 更改为使用 postgres db
- 我正在使用 a
CeleryExecutor
而不是 aSequentialExecutor
非常感谢你的帮助!
airflow - 如何在 SlackAPIPostOperator 中获取“ds”?
我想运行一个需要 datetime 参数的 python 脚本并将其输出发布到 slack。但我不知道如何获取气流模板变量ds
。
假设,我有以下代码:
因为我将运行airflow backfill dag_id -s 2016-10-01
,所以ds
(这里是 2016-10-01)应该传递给松弛文本。
我尝试将 python 脚本输出写入文件,然后读取它并直接传递给松弛文本。但我不认为这是一个完美的解决方案。