1

我目前对 MWAA 的理解

MWAA 将 Fargate 用于 Scheduler(s) 和 Worker(s)。MWAA 必须至少有 1 个调度程序,它必须是 Fargate,我假设 24/7 无休止的进程(以避免退役)。Scheduler Fargate 机器也在运行 Celery executor。

触发时,DAG 被 Executor 拆分为任务,每个任务都被添加到队列中,由 Workers 从中拉取。如果是 bash 运算符,任务会提交给 Fargate Worker 并在那里执行。

工人扩展由 Fargate 管理,您只需指定最小/最大计数。如果任务同时提交,工人退役被触发 - 它可能会失败(知道问题)。

问题

假设执行单任务 DAG。任务在另一个 AWS 服务上执行,即 EMR ( EmrAddStepsOperator)。

  1. EmrAddStepsOperator先提交给 MWAA Fargate Worker,然后才提交给 EMR?还是直接从 MWAA Scheduler 提交给 EMR?
  2. 如果我的所有任务都在 EMR 执行,我是否需要 MWAA 工作人员?
4

1 回答 1

1

无论 MWAA / Google Cloud Composer / 其他什么,答案都是一样的。

Airflow 是编排工具。任务在 Airflow 工作人员上运行,但任务的“核心”可以在另一个服务上执行。考虑执行某些 SQL 的任务的用例。SQL 的实际计算是在数据库上完成的,而不是在提交 SQL 作业的机器上。Airflow 必须为每个操作员创建一个任务,该任务必须在 Airflow 工作人员上运行。如果任务对工作人员本身执行计算/处理或将作业提交给另一个服务并等待取决于任务本身的响应。

为了更好地解释:execute()任何操作员必须实现的功能都在您的 Celery worker 上运行。在此函数中,您可以在部分代码中将作业提交到另一个服务(如 EMR)。在这些情况下,您还可以选择是否要保留 Celery 工作人员直到外部服务 (EMR) 返回答案(同步方式),或者同时释放 Celery 工作人员做其他事情(运行另一个任务)。这取决于运营商的实施方式。

所以回答你的问题:

  1. 当 Airflow 调度程序执行时EmrAddStepsOperator,将为它创建一个任务。该任务将由 处理CeleryExecutor并将发送给 Celery 工人。当任务开始运行时,它将执行add_job_flow_steps,然后才会将一个步骤提交给 EMR。

  2. 是的,因为CeleryExecutor向 Celery 工人提交任务。它不知道/关心您的任务正在与哪些服务进行交互。也就是说 - 如果您的任务只是将作业提交给其他服务,那么您可能不需要具有高内存/cpu 的工作人员,因为工作人员不进行大量处理。

于 2021-11-30T18:49:01.097 回答