问题标签 [mwaa]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
63 浏览

neo4j - 是否可以处理从 Airflow Task 内部关闭的任务?

我们使用 Neo4j 并从 Airflow 任务中调用它的查询。问题是当任务在 Airflow GUI 中标记为“失败”或“完成”时,这些查询通常不会停止。所以,我想找到一种方法,当任务被标记为“失败”或“完成”时,如何从当前正在运行的任务中调用终止查询。

在 Airflow 中,使用GraphDatabase.driver中的session.run(query)方法执行查询。其中GraphDatabase是 neo4j python 库的一部分

有什么简单的解决方案吗?

0 投票
0 回答
99 浏览

python - 如何使用 AWS Managed Airflow 运行 DBT 作业?

我想知道如何使用 AWS Managed Airflow 运行我的 DBT 作业。

我在 AWS 中创建了一个用于测试的环境,按照这个LINK他们失败了。

我还在 S3 中上传了requirements.txt以及plugins.zip

plugins.zip 的内容

  • 数据库
  • env_var_plugin.py

env_var_plugin.py

在此处输入图像描述

数据库 在此处输入图像描述

要求.txt

在此处输入图像描述

还将我的 dag 文件 S3 上传到 Dags/ 文件夹

0 投票
0 回答
86 浏览

airflow - 如何最好地将“ftp to s3 operator”合并到现有的 Airflow DAG 中?希望使用airflow.providers.amazon.aws.transfers.ftp_to_s3

我对 Airflow 比较陌生,正在寻求将 #following #airflow.providers.amazon.aws.transfers.ftp_to_s3 引入我的 DAG 的帮助。#参数(s3_bucket、aws_conn_id、ftp_conn_id 等)如何传递给这个操作符?这些参数会存在于引用的 python 脚本中吗?

https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_api/airflow/providers/amazon/aws/transfers/ftp_to_s3/index.html

https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_modules/airflow/providers/amazon/aws/transfers/ftp_to_s3.html

0 投票
1 回答
196 浏览

google-cloud-platform - MWAA 中的 apache-airflow-providers-google

我试图在 MWAA 中安装 apache-airflow-providers-google,但似乎 Airflow 没有选择库:

我有以下 requirements.txt 文件:

我做错了什么?

0 投票
0 回答
35 浏览

airflow - 使用 MWAA(气流管理气流)破坏 DAG - psycopg2

使用 Managed Airflow 2.0.2,我的 requirements.txt 看起来像

要求.txt

Apache-airflow[postgres] psycopg2-binary botocore

当我尝试导入 psycopg2 时,我得到一个损坏的 DAG 无法识别 psycopg2

我已经试过了

要求.txt

Apache-airflow[postgres] psycopg2 botocore

甚至

要求.txt

Apache-airflow[postgres] psycopg2 psycopg2-binary botocore

0 投票
1 回答
33 浏览

python - 表传输在 Airflow 中的第一次迭代后停止

我使用以下代码通过 Airflow (MWAA) 将小表从数据库 A 传输到数据库 B:

然而,代码只循环一次并且不传输任何记录,只传输目标数据库中表的模式。该表有大约 50000 条记录,但调整块大小无济于事。日志中没有错误。

该代码在 Jupyter notebook 中执行时运行良好,无需使用 Airflow Hooks。

有什么建议可能是什么问题?

0 投票
1 回答
192 浏览

amazon-web-services - 本地 MWAA (AWS Airflow) 使用 SecretsManager 作为后端“找不到凭证”

我正在运行在 Docker 上运行的 MWAA 实例的本地环境。一切正常,我尝试将 SecretsManager 设置为仅用于连接的替代后端。

我已将此添加到airflow.cfg文件中:

然后,我aws_default在 Airflow Connections 中配置了一个连接:类型:Amazon Web Services

名称:aws_default

登录:<aws_access_key>

密码:<aws_secret_access_key>

我通过使用使用此连接 ID 启动 AWSHook 的函数验证了凭证是否正常工作。

我向凭证所属的用户添加了 SecretsManager 读写策略。

然后,我尝试使用 SnowflakeOperator 对 Snowflake 运行查询,以便从 SecretsManager 获取连接,但我收到此错误,就好像后端 SecretsManager 服务无法找到使用 API 和查询密钥的凭据一样。xxxx_snowflake_operator 和 xxxx_snowflake_hook 只是对snowflake_operatorand的包装snowflake_hook,做同样的事情,没有任何改变(只有 UI 的颜色)

SecretsManager Airflow 后端为什么找不到凭证?它不使用默认的 AWS 连接 ID 来运行其 botocore API 请求吗?

更新:我通过在 ~/.aws/config 中设置一个包含 aws 访问和密钥的配置文件来使其工作。我想知道我是否可以让它使用 Airflow 中的 aws_default 连接。

0 投票
1 回答
143 浏览

airflow - MWAA requirements.txt 可以自动设置为最新版本吗?

我们有一个将 DAG 自动上传到 MWAA 的系统。

有没有办法让 S3 存储桶中的 requirements.txt 自动设置为最新版本?

我在 AWS 控制台中找不到此选项。

0 投票
1 回答
240 浏览

airflow - MWAA 未找到 aws_default 连接

我刚刚设置了 AWS MWAA(托管气流),并且正在玩在 dag 中运行一个简单的 bash 脚本。我正在阅读该任务的日志并注意到默认情况下,该任务会查找aws_default连接并尝试使用它但没有找到它。

我去了连接窗格并设置了aws_default连接,但它仍然在日志中显示相同的消息。

气流连接:aws_conn_id=aws_default

未从 Connection 检索到凭据

如何让 MWAA 识别此连接?

我的天:

编辑:现在,我只是导入一个内置函数并使用它来获取凭据。例子:

0 投票
1 回答
80 浏览

airflow - 没有工人的 MWAA

我目前对 MWAA 的理解

MWAA 将 Fargate 用于 Scheduler(s) 和 Worker(s)。MWAA 必须至少有 1 个调度程序,它必须是 Fargate,我假设 24/7 无休止的进程(以避免退役)。Scheduler Fargate 机器也在运行 Celery executor。

触发时,DAG 被 Executor 拆分为任务,每个任务都被添加到队列中,由 Workers 从中拉取。如果是 bash 运算符,任务会提交给 Fargate Worker 并在那里执行。

工人扩展由 Fargate 管理,您只需指定最小/最大计数。如果任务同时提交,工人退役被触发 - 它可能会失败(知道问题)。

问题

假设执行单任务 DAG。任务在另一个 AWS 服务上执行,即 EMR ( EmrAddStepsOperator)。

  1. EmrAddStepsOperator先提交给 MWAA Fargate Worker,然后才提交给 EMR?还是直接从 MWAA Scheduler 提交给 EMR?
  2. 如果我的所有任务都在 EMR 执行,我是否需要 MWAA 工作人员?