问题标签 [apache-airflow]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
1655 浏览

python-2.7 - 气流:如何从大查询输出中获取数据可用性的响应并基于结果启动任务/子标签

需求是根据上游/依赖表的数据可用性启动 dag

同时条件检查数据可用性(在大查询的表中进行 n 次迭代)以检查数据是否可用。如果数据可用,则启动 subdag/task,否则继续循环。

很高兴看到一个清晰的示例如何使用 BigQueryOperator 或 `BigQueryValueCheckOperator' 然后执行类似这样的大查询

{Code} SELECT 1 FROM WHERE datetime BETWEEN TIMESTAMP(CURRENT_DATE()) AND TIMESTAMP(DATE_ADD(CURRENT_DATE(),1,'day')) LIMIT 1 {Code}

如果查询输出为 1(这意味着数据可用于今天的负载),则启动 dag 否则继续循环,如附图链接所示。

有没有人在 Airflow dag 中设置过这样的设计。

0 投票
0 回答
2045 浏览

airflow - Airflow dag 任务卡在运行或无状态

我创建了一个 dag,其中包含几个运行简单 bash 命令的 subdag。我可以看到,几乎从一开始,许多任务就陷入了运行或无状态模式,并且无法继续。一段时间后,我可以看到越来越多的 dag 实例被卡住,而我只剩下一个实例真正在运行。我能做些什么来确保不会发生这种情况?

这是我的一天:

0 投票
2 回答
5694 浏览

airflow - 如何设置 2 台服务器的 Airflow?

试图将 Airflow 进程拆分到 2 个服务器上。服务器 A 已经以独立模式运行,上面有所有东西,它有 DAG,我想在新设置中将它设置为工作人员,并带有一个额外的服务器。

服务器 B 是在 MySQL 上托管元数据数据库的新服务器。

我可以让服务器 A 运行 LocalExecutor,还是必须使用 CeleryExecutor?airflow scheduler必须在有 DAG 的服务器上运行吗?还是必须在集群中的每台服务器上运行?对进程之间存在哪些依赖关系感到困惑

0 投票
3 回答
7104 浏览

airflow - Airflow DAG Run 已触发,但从未执行?

我发现自己处于手动触发 DAG Run (via airflow trigger_dag datablocks_dag) 运行的情况,并且 Dag Run 显示在界面中,但随后它永远保持“运行”状态而没有实际执行任何操作。

当我在 UI 中检查此 DAG Run 时,我看到以下内容:

在此处输入图像描述

我已经start_date设置为datetime(2016, 1, 1),并schedule_interval设置为@once阅读文档的理解是,由于start_date< 现在,DAG 将被触发。@once确保它只发生一次。

我的日志文件说:

什么可能导致问题?

我是否误解了如何start_date运作?

还是schedule_interval WARNING日志文件中看似令人担忧的行可能是问题的根源?

0 投票
1 回答
1282 浏览

airflow - 每小时运行任务的apache气流外部任务传感器

在 apache 气流中,可以将外部任务传感器添加到日常工作中每小时运行的工作中。

我们有一个场景,在日常工作中,我们需要一些由日常工作更新的列。

但有时在 23:00 运行的每小时作业在每日作业开始之前并未完成。

在这种情况下,我们错过了按小时工作所做的一些更新。

我们可以从日常工作中添加对每小时工作的依赖吗?

0 投票
1 回答
4777 浏览

airflow - 动态创建任务列表

我有一个 DAG,它是通过向 DynamoDB 查询列表来创建的,对于列表中的每个项目,使用 PythonOperator 创建一个任务并将其添加到 DAG。未在下面的示例中显示,但重要的是要注意列表中的某些项目依赖于其他任务,因此我使用set_upstream它来强制执行依赖关系。

工作流.py

问题是workflow.py一遍又一遍地运行(每次任务运行时?),我的get_task_list()方法被 AWS 限制并抛出异常。

我认为这是因为无论何时run_task()调用它都会运行所有全局变量,workflow.py所以我尝试移动run_task()到一个单独的模块中,如下所示:

但这并没有改变什么。我什至尝试放入get_task_list()一个用工厂函数包装的 SubDagOperator,它的行为仍然相同。

我的问题与这些问题有关吗?

此外,为什么workflow.py如此频繁地运行以及为什么get_task_list()当任务方法不引用workflow.py并且不依赖于它时导致单个任务失败时引发的错误?

最重要的是,并行处理列表并强制列表中项目之间的任何依赖关系的最佳方法是什么?

0 投票
0 回答
79 浏览

airflow - 为什么 Airflow 会创建多个日志文件?

我最近开始研究 Airflow 调度程序。而且我一直在观察它正在为每个计划的作业创建多个日志文件。我可以知道如何将其限制为一个文件。我检查了airflow.cfg 文件,但找不到与日志文件副本数相关的任何参数。

0 投票
1 回答
1444 浏览

redis - 使用 AWS ElastiCache 请求中的 Airflow CROSSSLOT 密钥不会散列到相同的插槽错误

我在 AWS ECS 上运行 apache-airflow 1.8.1,我有一个 AWS ElastiCache 集群(redis 3.2.4)运行 2 个分片/2 个启用多可用区的节点(集群 redis 引擎)。我已经验证气流可以毫无问题地访问集群的主机/端口。

这是日志:

0 投票
0 回答
4841 浏览

multithreading - Apache 气流:使用 sqlite 时不能使用超过 1 个线程。将 max_threads 设置为 1

我已经用 postgresql 数据库设置了气流,我正在创建多个 dag

它创建动态任务,但在所有数量的动态任务上,它总是使最后一个失败并记录错误:“使用 sqlite 时不能使用超过 1 个线程。将 max_threads 设置为 1”例如:如果创建 4 个任务,则运行 3 次,并且如果创建了 2 个任务,则运行 1 次。

0 投票
2 回答
5323 浏览

python-2.7 - 虽然气流 initdb,AttributeError:模块'对象没有属性'client_auth'

我最近安装了 apache airflow 1.8.1,我执行了以下命令:

airflow initdb

返回以下错误:

我尝试了几种解决方案,但它不起作用。