问题标签 [airflow-2.x]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
228 浏览

python-3.x - 如何在 Cloud Composer 中导入自定义模块

我用 apache Airflow 创建了一个本地项目,我想在云作曲家中运行它。我的项目包含自定义模块和调用它们的主文件。
示例:从 src.kuzzle 导入 KuzzleQuery

结构:
  • 主文件
  • 源代码
    • 谜题.py


我已经在数据存储中导入了我的项目文件夹,当我刷新气流作曲家的 UI 时,我遇到了这个错误:

0 投票
1 回答
34 浏览

airflow - Airflow 2.0 - 在本地运行会一直运行该功能

我有以下任务一直在运行我知道这一点,因为它在 Snowflake 中运行查询并且我不断收到 DUO 推送通知。每一个。5秒!我能做些什么来阻止它并且只在 DAG 运行时运行它

这是任务:

这是在 sql 部分中调用的方法:

0 投票
0 回答
103 浏览

python-3.x - Apache Airflow:如何将非模板化参数传递给 Airflow 2.0 中的模板字段

我正在通过覆盖 BaseSensorOperator 创建的 Airflow 2 中设计一个自定义传感器 MySensor。我有一个名为的参数file,它作为模板字段传递给我的构造函数

template_fields = ("file",)

我有一个名为 file 的 Airflow 变量,它保存参数文件的值。然后我在我的 DAG 中实例化一个任务,如下所示。

这可以正常工作,因为它是一个模板字段,并且将使用存储在 Airflow 变量中的值。但是当我将此参数作为字符串传递时出现问题,即没有 Jinja 模板。

这给了我一个错误AttributeError: 'MySensor' object has no attribute 'file'

我在这里缺少什么吗?如何将非模板化参数传递给 Airflow 2 中的模板字段

0 投票
1 回答
46 浏览

sql-server - 将气流 2.2.2 与 SQL Server 集成为元存储面临的问题

在尝试将气流 2.2.2 与 SQL Server 集成为元存储时,我遇到了一个问题:

微软 SQL 服务器 2019

sqlalchemy.exc.IntegrityError: (pyodbc.IntegrityError) ('23000', "[23000] [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]当 IDENTITY_INSERT 为设置为OFF。(544) (SQLExecDirectW)") [SQL: INSERT INTO ab_permission (id, name) OUTPUT inserted.id VALUES (NEXT VALUE FOR ab_permission_id_seq, ?)] [参数: ('can_read',)] (背景此错误位于:https://sqlalche.me/e/14/gkpj)

0 投票
0 回答
57 浏览

airflow - 当它的父任务失败时如何触发Airflow中的任务,但是当它在upstream_failed时跳过它?

正如我所看到的,没有可以区分和状态的触发规则。当父进程运行但失败和根本没有运行时,我们必须采取不同的行为。failedupstream_failed

当父任务的状态为:

  • 成功 -> 运行子任务
  • 失败 -> 运行子任务
  • upstream_failed -> 跳过子任务
  • 跳过 -> 跳过子任务

我们怎样才能得到这种行为?
是否可以为此编写自定义触发规则?

我最感兴趣的是 Airflow 1.x 的答案

0 投票
1 回答
54 浏览

airflow - 在传感器超时时触发气流任务

我目前有一个PythonSensor等待 ftp 服务器上的文件。是否可以让此传感器在超时时触发任务?我正在尝试创建以下 dag:

气流传感器示意图

我已经看过了,BranchPythonOperator但如果第一次失败,我似乎不再获得重新安排任务的好处。

0 投票
0 回答
49 浏览

python - 如何使用 TriggerDagRunOperator 触发回填?

我有一个要求,我需要触发的 dagTriggerDagRunOperator来执行回填,而不仅仅是相同的执行日期。

TriggerDagOperator设置如下:

目标 dag 基本上是:

target_dag仅在今天执行而不是回填。

无论过去的 dag 运行如何,我如何强制它回填?我正在使用气流 2.0

0 投票
1 回答
135 浏览

airflow - 在 Apache Airflow 2 中监控长时间运行的任务的进度

我正在尝试将一个临时控制和监控的工作流移至 Airflow 2。该工作流由多个步骤组成,这是一个非常典型的用例,只有一个例外 - 一个步骤是一项非常长时间运行的工作。

在极少数情况下,这项工作可能需要几分钟一天(甚至两天)。该任务实际上是由不同的系统执行的(不在我的控制范围内),这里的 Airflow 只负责远程启动它并轮询状态。没有办法将任务分成更小的任务。但是,我能够在任务运行时监控任务的状态和进度。在任务执行之前,我自己也无法对任务难度做出任何假设——我完全依赖于报告的进度。

尽管总步数仍然相同,但每次 DAG 运行的时间量可能会在数量级上有所不同。因此,以某种方式将有关任务进度的知识整合到 Airflow 中会非常有帮助。任何提示如何解决这个问题?

0 投票
2 回答
51 浏览

airflow - 更改自定义运算符中的默认 XcomArg 键

我有一堆自定义运算符,我想尝试使用 XcomArg 并在我的任务中使用 .output。

例如,下面我注释掉了xcom_push返回列表:

问题是我的密钥历来是“extract_list”,我在其他地方有一些对该密钥的引用。我传递了其他 xcom(例如最大 ID/时间戳),这些 xcom 被标记为return_value.

我可以更改我推送的 xcom 的密钥吗?

这个片段有效,但关键是return_value

我已经尝试添加test = XComArg(operator=extract, key="test_key"),然后input_files=test在我的转换任务中也有,但没有运气。我想我需要覆盖 FileToAzureBlobOperator 中的默认键。

0 投票
1 回答
68 浏览

python-3.x - 使用 Airflow 写入磁盘上的文件不起作用

我正在使用 Windows 机器并创建了气流容器。我可以通过 DAG 读取本地文件系统上的数据,但无法将数据写入文件。我也尝试过给出完整路径,也尝试过不同的运算符:Python 和 Bash,但它仍然不起作用。DAG 成功,没有任何失败可显示。注意: /opt/airflow : 是 $AIRFLOW_HOME 路径

可能是什么原因?

一段代码: