问题标签 [airflow-2.x]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python-3.x - 如何在 Cloud Composer 中导入自定义模块
我用 apache Airflow 创建了一个本地项目,我想在云作曲家中运行它。我的项目包含自定义模块和调用它们的主文件。
示例:从 src.kuzzle 导入 KuzzleQuery
结构:
- 主文件
- 源代码
- 谜题.py
我已经在数据存储中导入了我的项目文件夹,当我刷新气流作曲家的 UI 时,我遇到了这个错误:
airflow - Airflow 2.0 - 在本地运行会一直运行该功能
我有以下任务一直在运行我知道这一点,因为它在 Snowflake 中运行查询并且我不断收到 DUO 推送通知。每一个。5秒!我能做些什么来阻止它并且只在 DAG 运行时运行它
这是任务:
这是在 sql 部分中调用的方法:
python-3.x - Apache Airflow:如何将非模板化参数传递给 Airflow 2.0 中的模板字段
我正在通过覆盖 BaseSensorOperator 创建的 Airflow 2 中设计一个自定义传感器 MySensor。我有一个名为的参数file
,它作为模板字段传递给我的构造函数
template_fields = ("file",)
我有一个名为 file 的 Airflow 变量,它保存参数文件的值。然后我在我的 DAG 中实例化一个任务,如下所示。
这可以正常工作,因为它是一个模板字段,并且将使用存储在 Airflow 变量中的值。但是当我将此参数作为字符串传递时出现问题,即没有 Jinja 模板。
这给了我一个错误AttributeError: 'MySensor' object has no attribute 'file'
。
我在这里缺少什么吗?如何将非模板化参数传递给 Airflow 2 中的模板字段
sql-server - 将气流 2.2.2 与 SQL Server 集成为元存储面临的问题
在尝试将气流 2.2.2 与 SQL Server 集成为元存储时,我遇到了一个问题:
微软 SQL 服务器 2019
sqlalchemy.exc.IntegrityError: (pyodbc.IntegrityError) ('23000', "[23000] [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]当 IDENTITY_INSERT 为设置为OFF。(544) (SQLExecDirectW)") [SQL: INSERT INTO ab_permission (id, name) OUTPUT inserted.id VALUES (NEXT VALUE FOR ab_permission_id_seq, ?)] [参数: ('can_read',)] (背景此错误位于:https://sqlalche.me/e/14/gkpj)
airflow - 当它的父任务失败时如何触发Airflow中的任务,但是当它在upstream_failed时跳过它?
正如我所看到的,没有可以区分和状态的触发规则。当父进程运行但失败和根本没有运行时,我们必须采取不同的行为。failed
upstream_failed
当父任务的状态为:
- 成功 -> 运行子任务
- 失败 -> 运行子任务
- upstream_failed -> 跳过子任务
- 跳过 -> 跳过子任务
我们怎样才能得到这种行为?
是否可以为此编写自定义触发规则?
我最感兴趣的是 Airflow 1.x 的答案
airflow - 在传感器超时时触发气流任务
我目前有一个PythonSensor
等待 ftp 服务器上的文件。是否可以让此传感器在超时时触发任务?我正在尝试创建以下 dag:
我已经看过了,BranchPythonOperator
但如果第一次失败,我似乎不再获得重新安排任务的好处。
python - 如何使用 TriggerDagRunOperator 触发回填?
我有一个要求,我需要触发的 dagTriggerDagRunOperator
来执行回填,而不仅仅是相同的执行日期。
TriggerDagOperator
设置如下:
目标 dag 基本上是:
target_dag
仅在今天执行而不是回填。
无论过去的 dag 运行如何,我如何强制它回填?我正在使用气流 2.0
airflow - 在 Apache Airflow 2 中监控长时间运行的任务的进度
我正在尝试将一个临时控制和监控的工作流移至 Airflow 2。该工作流由多个步骤组成,这是一个非常典型的用例,只有一个例外 - 一个步骤是一项非常长时间运行的工作。
在极少数情况下,这项工作可能需要几分钟到一天(甚至两天)。该任务实际上是由不同的系统执行的(不在我的控制范围内),这里的 Airflow 只负责远程启动它并轮询状态。没有办法将任务分成更小的任务。但是,我能够在任务运行时监控任务的状态和进度。在任务执行之前,我自己也无法对任务难度做出任何假设——我完全依赖于报告的进度。
尽管总步数仍然相同,但每次 DAG 运行的时间量可能会在数量级上有所不同。因此,以某种方式将有关任务进度的知识整合到 Airflow 中会非常有帮助。任何提示如何解决这个问题?
airflow - 更改自定义运算符中的默认 XcomArg 键
我有一堆自定义运算符,我想尝试使用 XcomArg 并在我的任务中使用 .output。
例如,下面我注释掉了xcom_push
返回列表:
问题是我的密钥历来是“extract_list”,我在其他地方有一些对该密钥的引用。我传递了其他 xcom(例如最大 ID/时间戳),这些 xcom 被标记为return_value
.
我可以更改我推送的 xcom 的密钥吗?
这个片段有效,但关键是return_value
:
我已经尝试添加test = XComArg(operator=extract, key="test_key")
,然后input_files=test
在我的转换任务中也有,但没有运气。我想我需要覆盖 FileToAzureBlobOperator 中的默认键。
python-3.x - 使用 Airflow 写入磁盘上的文件不起作用
我正在使用 Windows 机器并创建了气流容器。我可以通过 DAG 读取本地文件系统上的数据,但无法将数据写入文件。我也尝试过给出完整路径,也尝试过不同的运算符:Python 和 Bash,但它仍然不起作用。DAG 成功,没有任何失败可显示。注意: /opt/airflow : 是 $AIRFLOW_HOME 路径
可能是什么原因?
一段代码: