问题标签 [airflow-2.x]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
airflow - 如何使用 @dag 和 @task 装饰器动态生成 DAG?
这篇文章中的代码 -在 Airflow 中动态生成 DAG 会动态生成 DAG。它使用 PythonOperator 来定义一个任务。
是否可以使用 @dag 和 @task 装饰器动态生成 DAG?
这段代码运行并且 DAG 出现在 Airflow UI 中,但我认为 DAG 无论如何都会出现,即使globals()[dag_id] = dag
要被删除。
@tags
在 Airflow 2.x 中使用和@task
装饰器动态生成标签的正确方法是什么?
airflow - 气流在超时前将数据推送到 x-com
有没有办法在传感器(BaseSensorOperator)超时之前将数据推送到 x-com?
我正在使用传感器的“on_failure_callback”来捕捉故障/超时。但是“on_faliure_callback”发生在超时之后。我想在超时之前将数据推送到 x-com。
amazon-web-services - EMR 上的 Spark 作业提交错误:java.net.URISyntaxException:索引 3 处的预期方案特定部分:s3
我通过 AWS 的 Managed Airflow 服务 (MWAA) 向 EMR 提交 Spark 作业。这些作业在 MWAA 1.10.12 版中运行良好。最近,AWS 发布了更新版本的 MWAA,即 2.0.2。我用这个版本创建了一个新环境,并尝试将相同的作业提交给 EMR。但它失败并出现以下错误:
spark-submit命令如下所示:
作业提交在 10 秒内失败。因此,未创建 YARN 应用程序 ID。
我试图解决的错误:
- 我将亚马逊相关包添加到
requirements.txt
:
- 我将导入语句从:
至
- 将 URI 方案更改为 s3n 和 s3a
我查看了有关 MWAA 以及 Airflow 2.0.2 的官方文档和博客,并进行了上述更改。但到目前为止没有任何效果。我寻求帮助以尽早解决此错误。提前致谢
python-3.x - 新的 DAG / 对 DAG 的更改未被 Airflow 拾取
问题 :
新的 DAG 或对现有 DAG 的更改不会显示在 Airflow Web 服务器上以在应用程序中使用。
例如。假设我在 DAG 目录中添加了一个新的 DAG:
有什么作用:
- 如果我跑步,
$ airflow dags list
那么 dag 确实会出现。 - 同样,如果我使用
select dag_id from dag;
然后查看数据库,则存在新的 dag。因此,它正在被拾取并放入数据库中。
什么不起作用:
- 对 DAG 的更改未显示在 Web 应用程序中
- 如果我在表中查看数据库中的 DAG 源代码
dag_code
,则源代码没有被更新 - 如果我重新启动或停止/启动网络服务器和/或调度程序,则 DAG 仍然不会出现在网络服务器中或在
dag_code
表中更改。
最后什么有效:
- 如果我
$ airflow db init
再次运行,那么更改会被拾取并且一切正常......
所以我的系统稳定且可用,因为我正在破解以下功能:$ airflow db init
. 由于运行此命令不会影响数据库中的数据,因此我实际上可以像这样工作,并且每次发生更改时都运行该命令。但我很担心,因为这并没有按预期工作,它可能掩盖了更深层次的问题。
任何帮助将不胜感激。我在下面列出了我的系统规格和气流设置。
系统规格和气流设置:
系统规格
- 操作系统:CentOS Linux 7(核心)
- Python 虚拟环境:Python 3.6.7、Airflow 2.1.3、pip 21.3.1
- systemctl --版本:systemd 219
- psql (PostgreSQL) 9.2.23
气流设置:
airflow.cfg
相关参数:
核心地点:
我正在使用 user: 的 AWS EC2 实例上运行svc-air-analytics
。关键位置:
airflow.cfg
地点:/home/svc-air-analytics/airflow/airflow.cfg
dags
地点 :/home/svc-air-analytics/airflow/dags/
- Python虚拟环境位置:
/home/env_svc_air_analytics
Systemctl 设置(运行网络服务器和调度程序):
- 环境文件
/etc/sysconfig/airflow
::
- 调度器
/usr/lib/systemd/system/airflow-scheduler.service
::
- 网络服务器:
airflow - 哪些 Airflow 权限允许通过 API 触发 DAG Run?
使用 Airflow 2.0.2,我正在尝试使用气流 API 来触发 DAG 运行。当我运行一个简单的 GET 时
我得到预期的结果:
所以用户fooUser
确实对 API 有一些基本的访问权限。但是,试图运行
我明白了
如果我授予用户角色并使用相同的fooUser
命令,我成功获得Admin
curl
我不希望这个用户有Admin
权限。我只想让他们使用 API 触发 DAG 运行。但是查看授予的权限列表,Admin
我无法确定我fooUser
需要哪些才能完成此操作。
用户需要哪些特定权限才能被允许使用 Airflow API 触发 DAG 运行?
google-cloud-platform - GCP ComposerV2 缺少日志文件
我们使用最新的气流版本部署了 GCP ComposerV2。它完美地工作。但有时“airflow_monitoring”预定义的 DAG 会崩溃。
以下是该问题的日志:
我们没有改变任何东西,这个问题是随机发生的。
这是“airflow_monitoring”预定义 DAG 的代码:
python - 将变量从 DAG 传递到外部函数
我有以下两个文件。一个带有 DAG 和两个任务(DummyOperator 和 TaskGroup)。
第二个文件是在第一个文件中创建并返回调用的任务组的方法。
我的问题如下:在第一个文件中,我将变量 (input_text) 传递给创建 TaskGroup 的方法,这又将 input_text 传递给 PythonOperator,它只是打印它。我不知道为什么变量没有从 DAG 传递给方法。当我打印它时,我有:
我是否忘记了有关 DAG 生命周期的一些基本信息?是否有另一种方法可以将变量传递给创建任务组的方法?
提前致谢。
更新
当我尝试编写一段代码来复制我的问题(基本问题是私有代码,工作)时,我更改了一个变量的名称,而重命名正是我问题的根源,这就是我放在这里的代码段起作用的原因为 LD Nicolas May。
那是一团糟:
似乎我不能在 op_kwargs 中使用键名,templates_dict
因为那是 PythonOperator 参数。
对不起,乱七八糟。
airflow - Airflow 2.2.0 / Flask-Appbuilder / 任务退出并返回代码 Negsignal.SIGKILL
从 Airflow 2.1.4 升级到 2.2.0(或 2.2.1 或 2.2.2)后,我们有许多 DAG 无法执行。Airflow 提供的错误是“Negsignal.SIGKILL”。当使用 SequentialExecutor 在本地运行以及部署到 EKS Kubernetes 集群时,会发生这种情况。
我们似乎无法找到有关 DAG 的任何问题,它们非常简单。有些 DAG 运行良好,有些则不行。
有这样的报道吗?在这一点上,问题似乎与 Airflow 2.2.x 系列有关,因为降级到 2.1.x 可以解决问题。
任何建议将不胜感激,因为我们需要升级以修复一些其他已知问题。
更新 12-14
我们已将问题缩小到使用 flask-appbuilder 模块的 DAG。在一些地方,我们使用 cached_app() 方法来查询 Airflow 用户和角色以执行维护任务。Airflow 2.2.x 中似乎发生了一些变化,导致容器立即终止。
kubernetes - 让 Airflow 在不指定密钥的情况下加载 Kubernetes 密钥中的所有密钥
我正在使用带有 Airflow 2.1.4 的 Google Cloud Composer 1.17.7。我主要关注这些文档。
我创建了一个 Kubernetes 机密,如下所示:
要加载此密钥并将其在 Airflow 中用作环境变量,在我看来,我必须为airflow.kubernetes.secret.Secret
K8s 密钥中包含的每个密钥创建一个对象。
当有很多键要使用时,这变得很麻烦。我想出了一个 for 循环解决方案,它仍然迫使我指定所有需要的键:
我想知道是否有一种方法可以在 Airflow 中加载 K8s 机密的所有密钥,而无需显式指定密钥。例如,能够以编程方式访问 K8s 密钥中包含的所有密钥的列表(例如k8sSecret.keys()
.