0

我在 GCP 上部署了一个 kubernetes 集群,结合了 prefect 和 dask。这些作业在正常情况下运行良好,但无法扩展 2 倍的数据。到目前为止,我已经将其范围缩小到调度程序由于高内存使用而被关闭。 Dask调度程序内存 一旦内存使用量达到2GB,作业就会失败,并出现“未检测到心跳”错误。

有一个单独的构建 python 文件可用,我们在其中设置工作内存和 cpu。有一个 dask-gateway 包,我们可以在其中获取网关选项并设置工作内存。

options.worker_memory = 32
options.worker_cores = 10
cluster = gateway.new_cluster(options)
cluster.adapt(minimum=4, maximum=20)

我无法弄清楚在哪里以及如何增加 dask-scheduler 的内存分配。

Specs:
Cluster Version: 1.19.14-gke.1900
Machine type - n1-highmem-64
Autoscaling set to 6 - 1000 nodes per zone
all nodes are allocated 63.77 CPU and 423.26 GB
4

1 回答 1

4

首先解释为什么 Flow 检测信号存在:Prefect 使用检测信号检查您的流程是否仍在运行。如果 Prefect 没有心跳,失去通信并在远程执行环境(如 Kubernetes 作业)中死亡的流将在 UI 中永久显示为正在运行。通常“未检测到心跳”是由于内存不足或当您的流程执行长时间运行的作业时发生的。

您可以尝试的一种解决方案是在运行配置中设置以下环境变量 - 这会将心跳行为从进程更改为线程,并有助于解决问题:

from prefect.run_configs import UniversalRun

flow.run_config = UniversalRun(env={"PREFECT__CLOUD__HEARTBEAT_MODE": "thread"})

正如您所提到的,最好的解决方案是增加 Dask 工作人员的内存。如果您使用长时间运行的集群,您可以这样设置:

dask-worker tcp://scheduler:port --memory-limit="4 GiB"

如果你将一个集群类传递给你的 Dask 执行器,例如coiled.Cluster,你可以同时设置:

  • scheduler_memory - 默认为 4 GiB
  • worker_memory - 默认为 8 GiB

以下是如何在流程中设置它:

import coiled
from prefect import Flow
from prefect.executors import DaskExecutor

flow = Flow("test-flow")
executor = DaskExecutor(
    cluster_class=coiled.Cluster,
    cluster_kwargs={
        "software": "user/software_env_name",
        "shutdown_on_close": True,
        "name": "prefect-cluster",
        "scheduler_memory": "4 GiB",
        "worker_memory": "8 GiB",
    },
)
flow.executor = executor
于 2021-11-15T13:59:36.267 回答