0

考虑一个prefect事先不知道内存需求的任务。如果任务因为worker没有足够的内存而失败,是否可以修改daskworker参数并重新运行任务?

如果有一种方法可以在每次失败后将每个工作人员的内存分配增加一些值,那就太好了。

4

1 回答 1

2

很难给出一个普遍的答案,因为这取决于您的基础设施。

  1. 例如,如果您想在cluster_class每次流运行时为 Dask ad-hoc 提供自定义关键字参数,您可以将动态函数传递给DaskExecutor's cluster_class. 该函数可以n_workersParameter任务中检索值,如下所示:
import prefect
from prefect import Flow, Parameter
from prefect.executors import DaskExecutor

def dynamic_executor():
    from distributed import LocalCluster

    # could be instead some other class e.g. from dask_cloudprovider.aws import FargateCluster
    return LocalCluster(n_workers=prefect.context.parameters["n_workers"])

with Flow(
    "dynamic_n_workers", executor=DaskExecutor(cluster_class=dynamic_executor)
) as flow:
    flow.add_task(Parameter("n_workers", default=5))

这意味着您可以使用n_workers定义的 ad-hoc 的不同值启动新的流运行。

  1. 第二个选项是在每个流运行的基础上在运行配置中分配更多内存 - 例如,您可以覆盖UImemory_request上的集合:KubernetesRun
with Flow(
        FLOW_NAME,
        storage=STORAGE,
        run_config=KubernetesRun(
            labels=["k8s"],
            cpu_request=0.5,
            memory_request="2Gi",
        ),
) as flow:

上面的代码片段定义了 2 GB,但如果您发现流程运行以 OOM 错误结束并且您需要更多,您可以从具有更高内存请求的 UI 触发新流程运行。

  1. 最后一个选项是直接在流定义中覆盖执行器值:
import coiled
from prefect.executors import DaskExecutor

flow.executor = DaskExecutor(
    cluster_class=coiled.Cluster,
    cluster_kwargs={
        "software": "user/software_env_name",
        "shutdown_on_close": True,
        "name": "prefect-cluster",
        "scheduler_memory": "4 GiB",
        "worker_memory": "8 GiB",
    },
)

只要您使用脚本存储(例如,Git 存储类之一,例如 GitHub、Git、Gitlab、Bitbucket 等)而不是 pickle 存储,并且您使用修改值提交代码worker_memory,这应该反映在您的新流程中运行,因为有关执行程序的元数据未存储在后端 - 它是从您的流存储中检索的。

于 2021-12-13T09:09:03.947 回答