0

我的 AWS 账户中有一个 MWAA Airflow 环境。我正在设置的 DAG 应该从 S3 存储桶 A 读取大量数据,过滤我想要的内容并将过滤后的结果转储到 S3 存储桶 B。它需要每分钟读取一次,因为数据每分钟都在进来。每次运行处理大约 200MB 的 json 数据。

我的初始设置是使用 env 类mw1.small和 10 台工作机器,如果我在这个设置中只运行一次任务,每次运行大约需要 8 分钟才能完成,但是当我开始计划每分钟运行一次时,大多数都无法完成,开始需要更长的时间来运行(大约 18 分钟)并显示错误消息:

[2021-09-25 20:33:16,472] {{local_task_job.py:102}} INFO - Task exited with return code Negsignal.SIGKILL

我尝试将 env 类扩展到mw1.large15 个工作人员,在错误出现之前能够完成更多工作,但仍然无法赶上每分钟摄取的速度。在达到工人机器最大值之前,该Negsignal.SIGKILL错误仍会显示。

在这一点上,我应该怎么做才能扩大规模?我可以想象打开另一个 Airflow 环境,但这并没有什么意义。必须有一种方法可以在一个环境中做到这一点。

4

1 回答 1

0

我找到了解决方案,对于 MWAA,编辑环境并在 下Airflow configuration options,设置这些配置

  1. celery.sync_parallelism = 1
  2. celery.worker_autoscale = 1,1

这将确保您的 worker 机器一次运行 1 个作业,防止多个作业共享 worker,从而节省内存并减少运行时间。

于 2021-09-29T21:44:08.807 回答