我的 AWS 账户中有一个 MWAA Airflow 环境。我正在设置的 DAG 应该从 S3 存储桶 A 读取大量数据,过滤我想要的内容并将过滤后的结果转储到 S3 存储桶 B。它需要每分钟读取一次,因为数据每分钟都在进来。每次运行处理大约 200MB 的 json 数据。
我的初始设置是使用 env 类mw1.small
和 10 台工作机器,如果我在这个设置中只运行一次任务,每次运行大约需要 8 分钟才能完成,但是当我开始计划每分钟运行一次时,大多数都无法完成,开始需要更长的时间来运行(大约 18 分钟)并显示错误消息:
[2021-09-25 20:33:16,472] {{local_task_job.py:102}} INFO - Task exited with return code Negsignal.SIGKILL
我尝试将 env 类扩展到mw1.large
15 个工作人员,在错误出现之前能够完成更多工作,但仍然无法赶上每分钟摄取的速度。在达到工人机器最大值之前,该Negsignal.SIGKILL
错误仍会显示。
在这一点上,我应该怎么做才能扩大规模?我可以想象打开另一个 Airflow 环境,但这并没有什么意义。必须有一种方法可以在一个环境中做到这一点。