问题标签 [mwaa]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
447 浏览

airflow - Amazon Managed Airflow (MWAA) 导入自定义插件

我正在设置 AWS MWAA 实例,但在导入自定义插件时遇到问题。

我的本地项目结构如下所示:

我尝试在 s3 存储桶中匹配此结构:

但是,当我在本地项目上使用自定义运算符时,导入的工作方式如下 -

在 MWAA 上它只承认这样的进口 -

有没有办法让 MWAA 将进口识别为本地(来自运营商)?我应该以某种方式将文件上传到s3吗?

我还尝试上传 plugins.zip 文件,但没有成功:

0 投票
1 回答
365 浏览

amazon-web-services - MWAA Airflow Scaling:当我必须运行频繁且耗时的脚本时该怎么办?(Negsignal.SIGKILL)

我的 AWS 账户中有一个 MWAA Airflow 环境。我正在设置的 DAG 应该从 S3 存储桶 A 读取大量数据,过滤我想要的内容并将过滤后的结果转储到 S3 存储桶 B。它需要每分钟读取一次,因为数据每分钟都在进来。每次运行处理大约 200MB 的 json 数据。

我的初始设置是使用 env 类mw1.small和 10 台工作机器,如果我在这个设置中只运行一次任务,每次运行大约需要 8 分钟才能完成,但是当我开始计划每分钟运行一次时,大多数都无法完成,开始需要更长的时间来运行(大约 18 分钟)并显示错误消息:

我尝试将 env 类扩展到mw1.large15 个工作人员,在错误出现之前能够完成更多工作,但仍然无法赶上每分钟摄取的速度。在达到工人机器最大值之前,该Negsignal.SIGKILL错误仍会显示。

在这一点上,我应该怎么做才能扩大规模?我可以想象打开另一个 Airflow 环境,但这并没有什么意义。必须有一种方法可以在一个环境中做到这一点。

0 投票
0 回答
65 浏览

airflow-scheduler - 气流任务正在运行但计划上升 - MWAA

我在 MWAA 上运行 Airflow 1.10.12 我们在资产列表上每 30 分钟/1 小时/2 小时运行一次 dags 执行 - 我们现在有数百个资产,并且计划的任务数量不断增加。(Dag 已成功处理,但调度速度不够快?)

MWWA 气流配置:5 个工作人员 mw1.medium 旋转 EKS 实例进行计算 气流配置

这是 dags 元数据,您可以在其中看到计划的数字上升:
dags 元数据

随着时间的推移,活动任务的数量似乎并不疯狂 - 工作人员并未饱和。我想知道 dag_concurrency 是否存在瓶颈,因为我认为(?)落后的 dag 是那些有很多任务(5 个任务)的 dag。

我不明白什么?您对我的气流配置有什么建议/调整吗?谢谢

0 投票
0 回答
161 浏览

amazon-web-services - PythonVirtualenvOperator 作为 Amazon MWAA 中的自定义插件?

有人尝试过Amazon MWAA(Amazon Managed Workflows for Apache Airflow)中的 Apache Airflow PythonVirtualenvOperator的自定义插件吗?

我正确地遵循了那里描述的所有步骤,但是当我运行 dag 时出现“没有这样的文件或目录:'virtualenv':'virtualenv' ”错误。

0 投票
1 回答
195 浏览

amazon-web-services - AWS MWAA Cloudformation 堆栈创建失败并出现 NotStabilized

Cloudformation 在创建 AWS MWAA 集群时返回以下错误:

资源处理程序返回消息:“创建失败”(请求令牌:123 ...,HandlerErrorCode:NotStabilized)

IAM 策略、角色和子网配置都很好 我们正在尝试在所有子网都是私有的 VPC 中创建 MWAA 集群。所以没有连接到外部世界的 NAT 网关/实例。网络服务器访问模式仅为私有。

请让我知道我们必须考虑哪些事情来解决这个问题。

0 投票
0 回答
150 浏览

amazon-web-services - 在托管 apache 气流中管理环境变量

我正在使用 aws 管理的 apache 气流(也称为 mwaa)并尝试aws_default在连接中设置 aws_key_id 和 aws_secret。但是 mwaa 以某种方式创建了一个环境变量AIRFLOW_CONN_AWS_DEFAULT,其值为 asaws://并且它总是会首先尝试从这里而不是在连接中找到凭据。

在此处输入图像描述

那么有没有办法可以阻止aws默认创建这个环境变量?

0 投票
1 回答
287 浏览

python - 在气流中将 xcom 数据拉到任何操作员之外

我需要将数据从 xcom 提取到一个 python 变量中,该变量将使用一些正则表达式进行转换并进一步传递。但是,我无法在任何地方找到如何在不使用任何运算符的情况下从 xcom 读取数据(直接进入 python 代码)。我在 AWS 上使用气流 2.0.2 的 MWAA 并使用下面的代码片段。

这不起作用,因为 python 运算符将在 dag 运行后工作,而我在运行 dag 之前将 s3Path 转换后的值用于另一个运算符。我尝试将 s3Path 值设置为变量并读取它,但这不起作用,因为在上传 dag 时没有创建该变量。

我看到它ti.xcom_pull(key=messages, task_ids='sqs')可以用来从 xcom 中提取数据,但是我应该从哪里得到 ti?有没有办法让任务实例在不使用任何运算符的情况下与 xcom 一起工作。

基本上问题是如何获得 SQSRUN 发送给 xcom 的值。我无法找到有关如何使用 SQSSensor 获取的值的任何文档或在线链接。非常感谢一些帮助。

0 投票
1 回答
215 浏览

amazon-web-services - 使用 Apache Airflow (MWAA) 的托管工作流 - 如何禁用对先前运行的任务运行依赖

我有一个运行的 Apache Airflow 托管环境,其中定义并启用了许多 DAG。一些 DAG 是按计划运行的,按 15 分钟的计划运行,而另一些则没有计划。所有的 DAG 都是单任务 DAG。DAG 的结构如下:

2 级 DAG ->(触发器)1 级 DAG ->(触发器)0 级 DAG

调度的 DAG 是 2 级 DAG,而 1 级和 0 级 DAG 是非调度的。0级DAG用于ECSOperator调用预定义的弹性容器服务(ECS)任务,调用ECS任务中定义的Docker容器内的Python ETL脚本。2 级 DAG 等待 1 级 DAG 完成,然后依次等待 0 级 DAG 完成。ETL 脚本生成的完整 Python 日志在 ECS 任务运行的 CloudWatch 日志中可见,而 Airflow 任务日志仅显示高级日志记录。

计划 DAG(级别 2)中的单个任务已depends_on_past设置False为计划运行从发生。但是正在发生的事情是 Airflow 覆盖了这一点,我可以在 UI 中清楚地看到,特定级别 2 DAG 运行的失败正在阻止调度程序选择下一次运行 - 下一次调度运行状态被设置为None,我必须手动清除失败的 DAG 运行状态,然后调度程序才能再次调度它。

为什么会这样?据我所知,没有 Airflow 配置选项应该覆盖2 级 DAG 任务中False的任务级别设置。depends_on_past任何指针将不胜感激。

0 投票
1 回答
233 浏览

amazon-web-services - AWS Managed Airflow 将文件夹上传到 MWAA 环境

我将开始使用 AWS 管理的气流。为了让受管气流访问 dags,我需要将我的代码上传到 s3 存储桶中的 dags/ 目录,MWAA 会接收它。

但是,在我的代码库中,我在其他目录中有代码,例如tasks/目录。问题是当我将tasks文件夹上传到 s3 存储桶时,mwaa 没有提取它们,并且我的 dag 出现导入错误。

AWS 文档没有为此提供任何指导。我想知道以前有没有人这样做过?还是我必须将所有代码上传到dags/文件夹中?

0 投票
1 回答
599 浏览

amazon-web-services - 如何在 MWAA 中设置电子邮件警报?

在哪里可以找到气流.cfg 文件以在 MWAA 中设置电子邮件警报?在 Airflow 中,我们在 airflow.cfg 文件中进行设置,但 Amazon Managed Apache Airflow 是无服务器的,那么配置更改在哪里?我是否还需要在 S3 位置创建自己的 .cfg 文件,就像我们在 s3 位置提到的 dags 目录一样