问题标签 [airflow]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
1555 浏览

airflow - 以不同频率运行 dags | 空气流动

我一直在评估气流。我有一个用例,我有一个每小时运行一次的工作流来获取每小时的数据聚合。另一个每天运行以获得相同的每日聚合。是否可以创建一个组合工作流,其中仅当所有每小时聚合在过去一天都成功时才会运行每日聚合?我已经看到您可以创建子 dag,但是这两个 dag 可以以不同的频率运行吗?如果是怎么办?

0 投票
1 回答
4214 浏览

airflow - 调度 AirfFlow DAG 作业

我写了一个 AirFlow DAG 如下 -

根据上面的配置,作业应该每隔一分钟运行一次。但相反,它显示在输出下方

有人可以在这里指导我吗?

谢谢帕里

0 投票
3 回答
45922 浏览

airflow - 气流平行度

本地执行器在调度任务时产生新进程。它创建的进程数量是否有限制。我需要改变它。我需要知道 flow.cfg 中调度程序的“max_threads”和“parallelism”有什么区别?

0 投票
1 回答
2062 浏览

python-2.7 - 气流回填不起作用

我正在使用气流运行工作流 DAG。我设置的开始日期是 2014 年 5 月 26 日,schedule_interval 是 1 天,没有结束日期,它使用的是 Celery Executor。Airflow 从给定日期开始运行我的任务,并按预期从开始日期开始每天递增。

但问题是,它只会在开始日期后的 16 天内安排和运行 dags,然后停止。我必须重新启动调度程序服务才能安排下一组 dags 并运行。

我已经修改了下面的属性以在airflow.cfg中将它们设置为更高,但它仍然不起作用,

我的要求是让气流从开始日期到当前日期执行我的任务。看起来它可以运行的 dag 数量有限制,但我无法弄清楚。请建议。

我的气流版本是 1.7.1.2

0 投票
2 回答
3127 浏览

oozie - 子任务中的 AirFlow dag id 访问

我有一个包含三个 bash 任务的 DAG,计划每天运行。

我想在所有 bash 脚本中访问 dag 实例的唯一 ID(可能是 PID)。

有没有办法做到这一点?

我正在寻找与 Oozie 类似的功能,我们可以在工作流 xml 或 java 代码中访问 WORKFLOW_ID。

有人可以指点我有关“如何在 AirFlow DAG 中使用内置变量和自定义变量”的 AirFlow 文档吗

非常感谢帕里

0 投票
1 回答
20031 浏览

python - subDAG 在 Airflow 中究竟是如何工作的?启用 subDAG 意味着什么?

我查看了 Airflow subDAG 部分并试图在网上找到任何其他有用的东西,但是我没有找到任何详细解释如何使 subDAG 工作的东西。运行 subDAG 的要求之一是应该启用它。如何启用/禁用 subdag?

我编写了一些示例代码,但在气流中没有显示任何错误,但是当我尝试运行它时,subDAG 中的所有运算符都没有被执行。

这是我的主要 dag 代码:

在这段代码中,任务“开始”成功,但是 subdag 任务不做任何事情,既不失败也不成功。

这是我的 subDAG 代码:

此代码中的 3 个运算符获取文件“airflow.cfg”的行数,在该文件中找到“airflow_home”的值,并返回要打印的这两个值。这段代码独立工作,所以我认为这不是问题。

为了让 subDAG 执行它的操作符,我需要做些什么改变?

0 投票
4 回答
12121 浏览

airflow - 气流默认连接太多

我打开气流并检查连接,发现后面有太多连接运行。关于如何杀死我不使用的那些的任何想法,或者我很想知道运行它的最小 conn_id。

建筑学

  • LocalExecutor(与任何其他经纪人不同)
  • Postgres 作为元数据库

但是它列出了 17 个连接。

在此处输入图像描述

以下是连接列表。

这是airflow.cfg.

0 投票
2 回答
8195 浏览

owner - 我应该如何在气流中使用正确的所有者任务?

我不了解气流中的“所有者”。ower的评论是“任务的所有者,推荐使用unix用户名”。我写了一些以下代码。

}

但是我使用了命令“airflow test dagid taskid 2016-07-20”,我遇到了一些错误,... {bash_operator.py:77} INFO - put: Permission denied: user=airflow, ....

我以为我的工作是使用“最大”用户运行的,但显然是使用“气流”用户运行测试。

我希望如果我使用“最大”用户运行我的任务,我应该怎么做。

0 投票
2 回答
47187 浏览

bash - 气流将参数传递给依赖任务

将参数传递给 Airflow 中的依赖任务的方法是什么?我有很多 bashes 文件,我正在尝试将这种方法迁移到气流,但我不知道如何在任务之间传递一些属性。

这是一个真实的例子:

在 t2 中,我需要访问在 t1 中创建的目录名称。

解决方案

这不是最终的解决方案,因此欢迎改进。谢谢。

0 投票
2 回答
1247 浏览

owner - 我应该如何在 AIRFLOW 中使用“所有者”而不是 shell 所有者来运行任务

我的任务代码如下。

然后我用linux的'airflow'用户运行命令“airflow test test3 test3-task2 2016-07-25”。输出“whoami”的结果是“气流”。但我希望输出结果是任务的“所有者”。

我怎么了?

谢谢

以下是输出结果。

[2016-07-25 11:22:37,716] {bash_operator.py:64} INFO - 临时脚本位置:/tmp/airflowtmpoYNJE8//tmp/airflowtmpoYNJE8/test3-task2U1lpom

[2016-07-25 11:22:37,716] {bash_operator.py:65} INFO - 运行命令:whoami

[2016-07-25 11:22:37,722] {bash_operator.py:73} 信息 - 输出:

[2016-07-25 11:22:37,725] {bash_operator.py:77} 信息 -气流

[2016-07-25 11:22:37,725] {bash_operator.py:80} INFO - 命令以返回码 0 退出