问题标签 [airflow-2.x]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
53 浏览

airflow - 如何使用 @dag 和 @task 装饰器动态生成 DAG?

这篇文章中的代码 -在 Airflow 中动态生成 DAG 会动态生成 DAG。它使用 PythonOperator 来定义一个任务。

是否可以使用 @dag 和 @task 装饰器动态生成 DAG?

这段代码运行并且 DAG 出现在 Airflow UI 中,但我认为 DAG 无论如何都会出现,即使globals()[dag_id] = dag要被删除。

@tags在 Airflow 2.x 中使用和@task装饰器动态生成标签的正确方法是什么?

0 投票
0 回答
18 浏览

airflow - 气流在超时前将数据推送到 x-com

有没有办法在传感器(BaseSensorOperator)超时之前将数据推送到 x-com?

我正在使用传感器的“on_failure_callback”来捕捉故障/超时。但是“on_faliure_callback”发生在超时之后。我想在超时之前将数据推送到 x-com。

0 投票
0 回答
82 浏览

amazon-web-services - EMR 上的 Spark 作业提交错误:java.net.URISyntaxException:索引 3 处的预期方案特定部分:s3

我通过 AWS 的 Managed Airflow 服务 (MWAA) 向 EMR 提交 Spark 作业。这些作业在 MWAA 1.10.12 版中运行良好。最近,AWS 发布了更新版本的 MWAA,即 2.0.2。我用这个版本创建了一个新环境,并尝试将相同的作业提交给 EMR。但它失败并出现以下错误:

spark-submit命令如下所示:

作业提交在 10 秒内失败。因此,未创建 YARN 应用程序 ID。

我试图解决的错误:

  1. 我将亚马逊相关包添加到requirements.txt
  1. 我将导入语句从:

  1. 将 URI 方案更改为 s3n 和 s3a

我查看了有关 MWAA 以及 Airflow 2.0.2 的官方文档和博客,并进行了上述更改。但到目前为止没有任何效果。我寻求帮助以尽早解决此错误。提前致谢

0 投票
1 回答
42 浏览

airflow - 气流 无 计划不起作用。任务仍在自动运行

我有一个不想安排的气流。我schedule_interval:None在我的 dag 文件中使用,但 dag 在部署后仍会自动运行。使用的气流版本:2.1.0

附气流截图。 在此处输入图像描述 我在我的 dag 文件中使用以下 python 代码。

此外,我已将气流环境变量设置为以下内容:

  • AIRFLOW__CORE__LOAD_EXAMPLES: "False"
  • AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL: 30
  • AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "False"
0 投票
0 回答
53 浏览

python-3.x - 新的 DAG / 对 DAG 的更改未被 Airflow 拾取

问题 :

新的 DAG 或对现有 DAG 的更改不会显示在 Airflow Web 服务器上以在应用程序中使用。

例如。假设我在 DAG 目录中添加了一个新的 DAG:

有什么作用:

  • 如果我跑步,$ airflow dags list那么 dag 确实会出现。
  • 同样,如果我使用select dag_id from dag;然后查看数据库,则存在新的 dag。因此,它正在被拾取并放入数据库中。

什么不起作用:

  • 对 DAG 的更改未显示在 Web 应用程序中
  • 如果我在表中查看数据库中的 DAG 源代码dag_code,则源代码没有被更新
  • 如果我重新启动或停止/启动网络服务器和/或调度程序,则 DAG 仍然不会出现在网络服务器中或在dag_code表中更改。

最后什么有效:

  • 如果我$ airflow db init再次运行,那么更改会被拾取并且一切正常......

所以我的系统稳定且可用,因为我正在破解以下功能:$ airflow db init. 由于运行此命令不会影响数据库中的数据,因此我实际上可以像这样工作,并且每次发生更改时都运行该命令。但我很担心,因为这并没有按预期工作,它可能掩盖了更深层次的问题。

任何帮助将不胜感激。我在下面列出了我的系统规格和气流设置。

系统规格和气流设置:

系统规格

  • 操作系统:CentOS Linux 7(核心)
  • Python 虚拟环境:Python 3.6.7、Airflow 2.1.3、pip 21.3.1
  • systemctl --版本:systemd 219
  • psql (PostgreSQL) 9.2.23

气流设置:

airflow.cfg相关参数:

核心地点:

我正在使用 user: 的 AWS EC2 实例上运行svc-air-analytics。关键位置:

  • airflow.cfg地点:/home/svc-air-analytics/airflow/airflow.cfg
  • dags地点 :/home/svc-air-analytics/airflow/dags/
  • Python虚拟环境位置:/home/env_svc_air_analytics

Systemctl 设置(运行网络服务器和调度程序):

  • 环境文件/etc/sysconfig/airflow::
  • 调度器/usr/lib/systemd/system/airflow-scheduler.service::
  • 网络服务器:
0 投票
0 回答
34 浏览

airflow - 哪些 Airflow 权限允许通过 API 触发 DAG Run?

使用 Airflow 2.0.2,我正在尝试使用气流 API 来触发 DAG 运行。当我运行一个简单的 GET 时

我得到预期的结果:

所以用户fooUser确实对 API 有一些基本的访问权限。但是,试图运行

我明白了

如果我授予用户角色并使用相同的fooUser命令,我成功获得Admincurl

我不希望这个用户有Admin权限。我只想让他们使用 API 触发 DAG 运行。但是查看授予的权限列表,Admin我无法确定我fooUser需要哪些才能完成此操作。

用户需要哪些特定权限才能被允许使用 Airflow API 触发 DAG 运行?

0 投票
1 回答
163 浏览

google-cloud-platform - GCP ComposerV2 缺少日志文件

我们使用最新的气流版本部署了 GCP ComposerV2。它完美地工作。但有时“airflow_monitoring”预定义的 DAG 会崩溃。

以下是该问题的日志:

我们没有改变任何东西,这个问题是随机发生的。

这是“airflow_monitoring”预定义 DAG 的代码:

0 投票
2 回答
106 浏览

python - 将变量从 DAG 传递到外部函数

我有以下两个文件。一个带有 DAG 和两个任务(DummyOperator 和 TaskGroup)。

第二个文件是在第一个文件中创建并返回调用的任务组的方法。

我的问题如下:在第一个文件中,我将变量 (input_text) 传递给创建 TaskGroup 的方法,这又将 input_text 传递给 PythonOperator,它只是打印它。我不知道为什么变量没有从 DAG 传递给方法。当我打印它时,我有:

我是否忘记了有关 DAG 生命周期的一些基本信息?是否有另一种方法可以将变量传递给创建任务组的方法?

提前致谢。


更新

当我尝试编写一段代码来复制我的问题(基本问题是私有代码,工作)时,我更改了一个变量的名称,而重命名正是我问题的根源,这就是我放在这里的代码段起作用的原因为 LD Nicolas May。

那是一团糟:

似乎我不能在 op_kwargs 中使用键名,templates_dict因为那是 PythonOperator 参数。

对不起,乱七八糟。

0 投票
0 回答
109 浏览

airflow - Airflow 2.2.0 / Flask-Appbuilder / 任务退出并返回代码 Negsignal.SIGKILL

从 Airflow 2.1.4 升级到 2.2.0(或 2.2.1 或 2.2.2)后,我们有许多 DAG 无法执行。Airflow 提供的错误是“Negsignal.SIGKILL”。当使用 SequentialExecutor 在本地运行以及部署到 EKS Kubernetes 集群时,会发生这种情况。

我们似乎无法找到有关 DAG 的任何问题,它们非常简单。有些 DAG 运行良好,有些则不行。

有这样的报道吗?在这一点上,问题似乎与 Airflow 2.2.x 系列有关,因为降级到 2.1.x 可以解决问题。

任何建议将不胜感激,因为我们需要升级以修复一些其他已知问题。

更新 12-14

我们已将问题缩小到使用 flask-appbuilder 模块的 DAG。在一些地方,我们使用 cached_app() 方法来查询 Airflow 用户和角色以执行维护任务。Airflow 2.2.x 中似乎发生了一些变化,导致容器立即终止。

0 投票
0 回答
120 浏览

kubernetes - 让 Airflow 在不指定密钥的情况下加载 Kubernetes 密钥中的所有密钥

我正在使用带有 Airflow 2.1.4 的 Google Cloud Composer 1.17.7。我主要关注这些文档。

我创建了一个 Kubernetes 机密,如下所示:

要加载此密钥并将其在 Airflow 中用作环境变量,在我看来,我必须为airflow.kubernetes.secret.SecretK8s 密钥中包含的每个密钥创建一个对象。

当有很多键要使用时,这变得很麻烦。我想出了一个 for 循环解决方案,它仍然迫使我指定所有需要的键:

我想知道是否有一种方法可以在 Airflow 中加载 K8s 机密的所有密钥,而无需显式指定密钥。例如,能够以编程方式访问 K8s 密钥中包含的所有密钥的列表(例如k8sSecret.keys().