问题标签 [mwaa]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
airflow - Amazon Managed Airflow (MWAA) 导入自定义插件
我正在设置 AWS MWAA 实例,但在导入自定义插件时遇到问题。
我的本地项目结构如下所示:
我尝试在 s3 存储桶中匹配此结构:
但是,当我在本地项目上使用自定义运算符时,导入的工作方式如下 -
在 MWAA 上它只承认这样的进口 -
有没有办法让 MWAA 将进口识别为本地(来自运营商)?我应该以某种方式将文件上传到s3吗?
我还尝试上传 plugins.zip 文件,但没有成功:
amazon-web-services - MWAA Airflow Scaling:当我必须运行频繁且耗时的脚本时该怎么办?(Negsignal.SIGKILL)
我的 AWS 账户中有一个 MWAA Airflow 环境。我正在设置的 DAG 应该从 S3 存储桶 A 读取大量数据,过滤我想要的内容并将过滤后的结果转储到 S3 存储桶 B。它需要每分钟读取一次,因为数据每分钟都在进来。每次运行处理大约 200MB 的 json 数据。
我的初始设置是使用 env 类mw1.small
和 10 台工作机器,如果我在这个设置中只运行一次任务,每次运行大约需要 8 分钟才能完成,但是当我开始计划每分钟运行一次时,大多数都无法完成,开始需要更长的时间来运行(大约 18 分钟)并显示错误消息:
我尝试将 env 类扩展到mw1.large
15 个工作人员,在错误出现之前能够完成更多工作,但仍然无法赶上每分钟摄取的速度。在达到工人机器最大值之前,该Negsignal.SIGKILL
错误仍会显示。
在这一点上,我应该怎么做才能扩大规模?我可以想象打开另一个 Airflow 环境,但这并没有什么意义。必须有一种方法可以在一个环境中做到这一点。
airflow-scheduler - 气流任务正在运行但计划上升 - MWAA
amazon-web-services - PythonVirtualenvOperator 作为 Amazon MWAA 中的自定义插件?
有人尝试过Amazon MWAA(Amazon Managed Workflows for Apache Airflow)中的 Apache Airflow PythonVirtualenvOperator的自定义插件吗?
我正确地遵循了那里描述的所有步骤,但是当我运行 dag 时出现“没有这样的文件或目录:'virtualenv':'virtualenv' ”错误。
amazon-web-services - AWS MWAA Cloudformation 堆栈创建失败并出现 NotStabilized
Cloudformation 在创建 AWS MWAA 集群时返回以下错误:
资源处理程序返回消息:“创建失败”(请求令牌:123 ...,HandlerErrorCode:NotStabilized)
IAM 策略、角色和子网配置都很好 我们正在尝试在所有子网都是私有的 VPC 中创建 MWAA 集群。所以没有连接到外部世界的 NAT 网关/实例。网络服务器访问模式仅为私有。
请让我知道我们必须考虑哪些事情来解决这个问题。
python - 在气流中将 xcom 数据拉到任何操作员之外
我需要将数据从 xcom 提取到一个 python 变量中,该变量将使用一些正则表达式进行转换并进一步传递。但是,我无法在任何地方找到如何在不使用任何运算符的情况下从 xcom 读取数据(直接进入 python 代码)。我在 AWS 上使用气流 2.0.2 的 MWAA 并使用下面的代码片段。
这不起作用,因为 python 运算符将在 dag 运行后工作,而我在运行 dag 之前将 s3Path 转换后的值用于另一个运算符。我尝试将 s3Path 值设置为变量并读取它,但这不起作用,因为在上传 dag 时没有创建该变量。
我看到它ti.xcom_pull(key=messages, task_ids='sqs')
可以用来从 xcom 中提取数据,但是我应该从哪里得到 ti?有没有办法让任务实例在不使用任何运算符的情况下与 xcom 一起工作。
基本上问题是如何获得 SQSRUN 发送给 xcom 的值。我无法找到有关如何使用 SQSSensor 获取的值的任何文档或在线链接。非常感谢一些帮助。
amazon-web-services - 使用 Apache Airflow (MWAA) 的托管工作流 - 如何禁用对先前运行的任务运行依赖
我有一个运行的 Apache Airflow 托管环境,其中定义并启用了许多 DAG。一些 DAG 是按计划运行的,按 15 分钟的计划运行,而另一些则没有计划。所有的 DAG 都是单任务 DAG。DAG 的结构如下:
2 级 DAG ->(触发器)1 级 DAG ->(触发器)0 级 DAG
调度的 DAG 是 2 级 DAG,而 1 级和 0 级 DAG 是非调度的。0级DAG用于ECSOperator
调用预定义的弹性容器服务(ECS)任务,调用ECS任务中定义的Docker容器内的Python ETL脚本。2 级 DAG 等待 1 级 DAG 完成,然后依次等待 0 级 DAG 完成。ETL 脚本生成的完整 Python 日志在 ECS 任务运行的 CloudWatch 日志中可见,而 Airflow 任务日志仅显示高级日志记录。
计划 DAG(级别 2)中的单个任务已depends_on_past
设置False
为计划运行从发生。但是正在发生的事情是 Airflow 覆盖了这一点,我可以在 UI 中清楚地看到,特定级别 2 DAG 运行的失败正在阻止调度程序选择下一次运行 - 下一次调度运行状态被设置为None
,我必须手动清除失败的 DAG 运行状态,然后调度程序才能再次调度它。
为什么会这样?据我所知,没有 Airflow 配置选项应该覆盖2 级 DAG 任务中False
的任务级别设置。depends_on_past
任何指针将不胜感激。
amazon-web-services - AWS Managed Airflow 将文件夹上传到 MWAA 环境
我将开始使用 AWS 管理的气流。为了让受管气流访问 dags,我需要将我的代码上传到 s3 存储桶中的 dags/ 目录,MWAA 会接收它。
但是,在我的代码库中,我在其他目录中有代码,例如tasks/
目录。问题是当我将tasks
文件夹上传到 s3 存储桶时,mwaa 没有提取它们,并且我的 dag 出现导入错误。
AWS 文档没有为此提供任何指导。我想知道以前有没有人这样做过?还是我必须将所有代码上传到dags/
文件夹中?
amazon-web-services - 如何在 MWAA 中设置电子邮件警报?
在哪里可以找到气流.cfg 文件以在 MWAA 中设置电子邮件警报?在 Airflow 中,我们在 airflow.cfg 文件中进行设置,但 Amazon Managed Apache Airflow 是无服务器的,那么配置更改在哪里?我是否还需要在 S3 位置创建自己的 .cfg 文件,就像我们在 s3 位置提到的 dags 目录一样