问题标签 [mwaa]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
neo4j - 是否可以处理从 Airflow Task 内部关闭的任务?
我们使用 Neo4j 并从 Airflow 任务中调用它的查询。问题是当任务在 Airflow GUI 中标记为“失败”或“完成”时,这些查询通常不会停止。所以,我想找到一种方法,当任务被标记为“失败”或“完成”时,如何从当前正在运行的任务中调用终止查询。
在 Airflow 中,使用GraphDatabase.driver中的session.run(query)方法执行查询。其中GraphDatabase是 neo4j python 库的一部分
有什么简单的解决方案吗?
python - 如何使用 AWS Managed Airflow 运行 DBT 作业?
我想知道如何使用 AWS Managed Airflow 运行我的 DBT 作业。
我在 AWS 中创建了一个用于测试的环境,按照这个LINK他们失败了。
我还在 S3 中上传了requirements.txt以及plugins.zip
plugins.zip 的内容
- 数据库
- env_var_plugin.py
env_var_plugin.py
要求.txt
还将我的 dag 文件 S3 上传到 Dags/ 文件夹
airflow - 如何最好地将“ftp to s3 operator”合并到现有的 Airflow DAG 中?希望使用airflow.providers.amazon.aws.transfers.ftp_to_s3
我对 Airflow 比较陌生,正在寻求将 #following #airflow.providers.amazon.aws.transfers.ftp_to_s3 引入我的 DAG 的帮助。#参数(s3_bucket、aws_conn_id、ftp_conn_id 等)如何传递给这个操作符?这些参数会存在于引用的 python 脚本中吗?
google-cloud-platform - MWAA 中的 apache-airflow-providers-google
我试图在 MWAA 中安装 apache-airflow-providers-google,但似乎 Airflow 没有选择库:
我有以下 requirements.txt 文件:
我做错了什么?
airflow - 使用 MWAA(气流管理气流)破坏 DAG - psycopg2
使用 Managed Airflow 2.0.2,我的 requirements.txt 看起来像
要求.txt
Apache-airflow[postgres] psycopg2-binary botocore
当我尝试导入 psycopg2 时,我得到一个损坏的 DAG 无法识别 psycopg2
我已经试过了
要求.txt
Apache-airflow[postgres] psycopg2 botocore
甚至
要求.txt
Apache-airflow[postgres] psycopg2 psycopg2-binary botocore
python - 表传输在 Airflow 中的第一次迭代后停止
我使用以下代码通过 Airflow (MWAA) 将小表从数据库 A 传输到数据库 B:
然而,代码只循环一次并且不传输任何记录,只传输目标数据库中表的模式。该表有大约 50000 条记录,但调整块大小无济于事。日志中没有错误。
该代码在 Jupyter notebook 中执行时运行良好,无需使用 Airflow Hooks。
有什么建议可能是什么问题?
amazon-web-services - 本地 MWAA (AWS Airflow) 使用 SecretsManager 作为后端“找不到凭证”
我正在运行在 Docker 上运行的 MWAA 实例的本地环境。一切正常,我尝试将 SecretsManager 设置为仅用于连接的替代后端。
我已将此添加到airflow.cfg
文件中:
然后,我aws_default
在 Airflow Connections 中配置了一个连接:类型:Amazon Web Services
名称:aws_default
登录:<aws_access_key>
密码:<aws_secret_access_key>
我通过使用使用此连接 ID 启动 AWSHook 的函数验证了凭证是否正常工作。
我向凭证所属的用户添加了 SecretsManager 读写策略。
然后,我尝试使用 SnowflakeOperator 对 Snowflake 运行查询,以便从 SecretsManager 获取连接,但我收到此错误,就好像后端 SecretsManager 服务无法找到使用 API 和查询密钥的凭据一样。xxxx_snowflake_operator 和 xxxx_snowflake_hook 只是对snowflake_operator
and的包装snowflake_hook
,做同样的事情,没有任何改变(只有 UI 的颜色)
SecretsManager Airflow 后端为什么找不到凭证?它不使用默认的 AWS 连接 ID 来运行其 botocore API 请求吗?
更新:我通过在 ~/.aws/config 中设置一个包含 aws 访问和密钥的配置文件来使其工作。我想知道我是否可以让它使用 Airflow 中的 aws_default 连接。
airflow - MWAA requirements.txt 可以自动设置为最新版本吗?
我们有一个将 DAG 自动上传到 MWAA 的系统。
有没有办法让 S3 存储桶中的 requirements.txt 自动设置为最新版本?
我在 AWS 控制台中找不到此选项。
airflow - MWAA 未找到 aws_default 连接
我刚刚设置了 AWS MWAA(托管气流),并且正在玩在 dag 中运行一个简单的 bash 脚本。我正在阅读该任务的日志并注意到默认情况下,该任务会查找aws_default
连接并尝试使用它但没有找到它。
我去了连接窗格并设置了aws_default
连接,但它仍然在日志中显示相同的消息。
气流连接:aws_conn_id=aws_default
未从 Connection 检索到凭据
如何让 MWAA 识别此连接?
我的天:
编辑:现在,我只是导入一个内置函数并使用它来获取凭据。例子:
airflow - 没有工人的 MWAA
我目前对 MWAA 的理解
MWAA 将 Fargate 用于 Scheduler(s) 和 Worker(s)。MWAA 必须至少有 1 个调度程序,它必须是 Fargate,我假设 24/7 无休止的进程(以避免退役)。Scheduler Fargate 机器也在运行 Celery executor。
触发时,DAG 被 Executor 拆分为任务,每个任务都被添加到队列中,由 Workers 从中拉取。如果是 bash 运算符,任务会提交给 Fargate Worker 并在那里执行。
工人扩展由 Fargate 管理,您只需指定最小/最大计数。如果任务同时提交,工人退役被触发 - 它可能会失败(知道问题)。
问题
假设执行单任务 DAG。任务在另一个 AWS 服务上执行,即 EMR ( EmrAddStepsOperator
)。
- 是
EmrAddStepsOperator
先提交给 MWAA Fargate Worker,然后才提交给 EMR?还是直接从 MWAA Scheduler 提交给 EMR? - 如果我的所有任务都在 EMR 执行,我是否需要 MWAA 工作人员?