问题标签 [airflow-scheduler]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
2167 浏览

airflow - Airflow - 一天运行三个不同的 DAG

我正在使用 Apache Airflow。在 Airflow 中,我可以添加类似 cron 的语法,但是有什么方法可以每天运行三个不同的 DAG?

从 cron 语法中,我只能在时间处于某个间隔内时添加。但我想在三个完全不同的时间运行工作。

例如,我想每天下午3.50、晚上7.15和晚上11.59运行一份工作。

0 投票
1 回答
3730 浏览

airflow - 气流中的事件监听器

我的任务是监听某些事件并启动其他功能。

这个函数(监听器)订阅一个 kafka 主题并永远运行,或者至少直到它得到一个“停止”事件。

将其包装为气流操作员似乎无法正常工作。意思是,如果我发送停止事件,它不会处理它,或者其他任何事情。是否可以在气流中运行繁忙的循环功能?

0 投票
2 回答
3932 浏览

airflow - 气流:每天重新执行过去 n 天的 DAG 的作业

我已安排每天运行 DAG 的执行。它可以完美运行一天。

但是,每天我不仅想重新执行当天的 {{ ds }},还想重新执行前 n 天(假设 n = 7)。

例如,在计划在“2018-01-30”运行的下一次执行中,我希望 Airflow 不仅使用执行日期“2018-01-30”运行 DAG,还希望为所有人重新运行 DAG前几天从“2018-01-23”到“2018-01-30”。

是否有一种简单的方法可以使先前的执行“无效”以便自动运行回填?

0 投票
1 回答
2284 浏览

airflow - 使用 Airflow 进行批处理,根据父任务的输出动态启动多个任务

我试图弄清楚是否可以使用 Airflow 来表达需要根据父任务的输出启动同一任务的多个实例的工作流。Airflow 支持多个工作人员,因此我天真地期望 Airflow 可用于编排涉及批处理的工作流。到目前为止,我还没有找到任何适合这个模型的食谱/方向。将 Airflow 用于如下所示的浴液处理工作流程的正确方法是什么?假设有一个 Airflow 工作人员池。

工作流示例: 1. 启动任务 A 以生成多个文件 2. 为每个文件启动任务 B 的一个实例(可能是另一个工作流) 3. 等待任务 B 的所有实例,然后启动任务 C

0 投票
1 回答
3164 浏览

airflow - 如何增加每秒排队的任务?

我正在尝试诊断性能不佳的气流管道,并想知道我应该从气流调度程序中获得什么样的性能,类似于“每秒调度的任务”。

我几乎没有排队的作业,而且我的许多任务在几秒钟内完成,所以我怀疑调度程序是限制组件,我有很多快速任务是我的错。不过,如果可以避免的话,我宁愿不重写我的 DAG。

我可以做些什么来提高调度程序排队任务的速率?


管道详细信息

这是我当前的airflow.cfg 的样子。

我只有两个 dags 在运行。一个每 5 分钟安排一次,另一个很少由第一个触发。我目前正试图以这种频率回填几年,但可能需要改变我的方法:

在此处输入图像描述

至于工作节点:我目前有 4 个相当强大的服务器运行,在磁盘、网络、cpu、RAM、交换方面的资源使用率不到 10%。关闭 3 个工作人员对我的任务吞吐量没有影响,并且服务器保持打开状态甚至几乎没有记录工作量的变化。

0 投票
3 回答
2134 浏览

python - Apache Airflow 没有安排任务

我已经安装了 apache-airflow(版本 v1.9.0)和 python 2.7。为了测试它是否安装正确,我尝试从浏览器的交互式视图中触发教程 DAG。该界面显示 DAG 正在运行,但调度程序未显示任何活动。

以下是我尝试的步骤

  1. 安装气流
  1. 安装 crypto 以使用 fernet 密钥设置加密
  1. 生成 fernet_key 并添加到 airflow.cfg 文件中:
  1. 初始化一个气流sqlite db
  1. 启动气流网络服务器
  1. 在不同的窗口中启动气流调度程序
  1. tutotial在 Airflow 页面上触发DAGlocalhost:8080

执行这些步骤后,我无法在调度程序窗口中看到任何动作,这只是让我显示

我试过在本地环境和虚拟环境中运行它。我也尝试过SequentialExecutor运行LocalExecutor

即使我尝试通过终端触发任务也不会触发任务airflow trigger_dag tutorial

我正在使用 mac OS High Sierra 版本 10.13.3

0 投票
3 回答
12262 浏览

python - 运行时的气流动态任务

关于“动态任务”的其他问题似乎涉及在计划或设计时动态构建 DAG。我有兴趣在执行期间将任务动态添加到 DAG。

这种幼稚的实现似乎不起作用 - 虚拟任务永远不会出现在 UI 中。

在执行期间向 DAG 添加新运算符的正确方法是什么?可能吗?

0 投票
1 回答
956 浏览

airflow - 气流 - 分配前引用的局部变量“文件名”

我在气流中遇到了一个烦人的问题,它不断在 UI 中排队很多任务,为了让它们保持运行,我必须重新启动调度程序和工作人员。我的 Airflow 配置使用 CeleryExecutor,在 Reddis 的帮助下运行在 2 个工作人员中。

我查看了工人的日志,它向我展示了这一点:

我遵循了这个解决方案,指示在气流运行命令之后使用--raw来查看真正的异常,它说以下内容:

有没有人有同样的问题或知道如何解决它?

0 投票
2 回答
4308 浏览

airflow - 如何在 DEV 和 PROD 环境之间迁移气流变量?

我们正在使用气流来安排我们的数据管道,作为其中的一部分,我们还在气流管理中添加了一些连接和变量。

在 DEV 中一切正常,现在我们要设置 PROD 环境。我们如何将这些值迁移到 PROD 环境中。

0 投票
4 回答
10193 浏览

airflow - Airflow:为什么有运营商的 start_date?

我不明白为什么我们需要操作员(任务实例)的“开始日期”。我们传递给 DAG 的那个不就足够了吗?

此外,如果当前时间是 2018 年 2 月 7 日上午 8 点 30 点 UTC,现在我将 dag 的 start_date 设置为 2018 年 2 月 7 日上午 0 点,我的计划间隔的 cron 表达式为 30 9 * * *(每天上午 9 点 30 分,即期待在接下来的 1 小时内运行)。我的 DAG 会在今天上午 9 点 30 分还是明天(2 月 8 日上午 9 点 30 分)运行?