本地执行器在调度任务时产生新进程。它创建的进程数量是否有限制。我需要改变它。我需要知道 flow.cfg 中调度程序的“max_threads”和“parallelism”有什么区别?
3 回答
并行性:不是一个非常具有描述性的名称。描述说它设置了气流安装的最大任务实例,这有点模棱两可——如果我有两台运行气流工作的主机,我会在两台主机上安装气流,所以应该是两个安装,但基于上下文这里的“每次安装”是指“每个气流状态数据库”。我将其命名为 max_active_tasks。
dag_concurrency:尽管名称基于注释,但这实际上是任务并发,它是每个工作人员的。我将其命名为 max_active_tasks_for_worker (per_worker 建议它是工人的全局设置,但我认为您可以为此设置不同值的工人)。
max_active_runs_per_dag:这还不错,但是由于它似乎只是匹配 DAG kwarg 的默认值,因此最好在名称中反映这一点,例如 default_max_active_runs_for_dags 所以让我们继续讨论 DAG kwargs:
concurrency:再一次,有一个像这样的通用名称,再加上并发用于其他地方不同的东西这一事实使得这很混乱。我称之为 max_active_tasks。
max_active_runs:这对我来说听起来不错。
来源:https ://issues.apache.org/jira/browse/AIRFLOW-57
max_threads让用户可以控制 CPU 的使用。它指定调度程序并行性。
现在是 2019 年,并且已经发布了更多更新的文档。简而言之:
AIRFLOW__CORE__PARALLELISM
是可以在所有 Airflow 上同时运行的任务实例的最大数量(所有 dag 上的所有任务)
AIRFLOW__CORE__DAG_CONCURRENCY
是单个特定 DAG 允许同时运行的最大任务实例数
这些文档更详细地描述了它:
根据https://www.astronomer.io/guides/airflow-scaling-workers/:
并行度是可以在气流上同时运行的最大任务实例数。这意味着在所有正在运行的 DAG 中,一次运行的任务不超过 32 个。
和
dag_concurrency 是允许在特定 dag 中并发运行的任务实例数。换句话说,你可以有 2 个 DAG 并行运行 16 个任务,但一个有 50 个任务的 DAG 也只能运行 16 个任务,而不是 32 个
而且,根据https://airflow.apache.org/faq.html#how-to-reduce-airflow-dag-scheduling-latency-in-production:
max_threads:调度程序将并行生成多个线程来调度 dag。这由 max_threads 控制,默认值为 2。用户应在生产中将此值增加到更大的值(例如,调度程序运行的 CPU 数量 - 1)。
但似乎最后一块不应该占用太多时间,因为它只是“调度”部分。不是实际的运行部分。因此,我们认为没有必要进行太多调整,max_threads
但确实影响了我们。AIRFLOW__CORE__PARALLELISM
AIRFLOW__CORE__DAG_CONCURRENCY
调度max_threads
程序是并行化调度程序的进程数。max_threads
不能超过 cpu 计数。LocalExecutorparallelism
是 LocalExecutor 应该运行的并发任务数。调度程序和 LocalExecutor 都使用 python 的多处理库进行并行处理。