0

我正在使用文档中提供的示例中的内容

import dask.bag
from dask_kubernetes import KubeCluster


cluster = KubeCluster.from_yaml('worker-spec.yml')
cluster.adapt(minimum=0, maximum=24, interval="20000ms")
dag = dask.bag.from_sequence(tasks).map(lambda x: make_task(x).execute())

with distributed.Client(dask_cluster) as client:
    results = dag.compute(scheduler=client)

cluster.close()

在我的例子中,这个execute()函数做了很多 IO,运行大约需要 5-10 分钟。我想以KubeCluster某种方式配置 and dask 调度程序,以最大限度地提高所有这些长期运行任务的顺利进行的机会。

我的问题有两个部分。首先,如何覆盖distributed配置设置?我想尝试类似的东西

dask.config.set({'scheduler.work-stealing': False})

但我不知道设置它的正确位置是什么。具体来说,我不知道这是否是每个工作人员都应该知道的事情,或者我是否可以通过仅在实例化KubeCluster.

我的问题的第二部分与对长时间运行(超过几分钟)的任务的建议有关。我一直在尝试使用默认设置。有时一切顺利,有时compute()调用失败,但出现以下异常:

  <... omitting caller from the traceback ...>
  File "/usr/local/lib/python3.7/site-packages/dask/base.py", line 436, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 2587, in get
    results = self.gather(packed, asynchronous=asynchronous, direct=direct)
  File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1885, in gather
    asynchronous=asynchronous,
  File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 767, in sync
    self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
  File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 345, in sync
    raise exc.with_traceback(tb)
  File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 329, in f
    result[0] = yield future
  File "/usr/local/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1741, in _gather
    raise exception.with_traceback(traceback)
distributed.scheduler.KilledWorker: ("('lambda-364defe33868bf6e4864da2933065a12', 3)", <Worker 'tcp://172.18.7.71:39029', name: 9, memory: 0, processing: 4>)

我正在从 master 分支运行最近的提交:dask-kubernetes@git+git://github.com/dask/dask-kubernetes.git@add93d56ba1ac2f7d00576bd3f2d1be0db3e1757.

编辑:

我更新了我的代码片段,以表明我正在调用该adapt()函数,并将最小数量的工作人员设置为 0。我开始想知道是否达到 0 个工作人员可能会导致调度程序在它返回compute()结果之前关闭。

4

1 回答 1

0

首先,如何覆盖分布式配置设置?

您可以通过修改配置 YAML 文件或设置环境变量来覆盖设置。

因此,在您的情况下,您可以更新您的~/.config/dask/distributed.yaml文件。

distributed:
  scheduler:
    work-stealing: false

或者通过设置环境变量。

export DASK_DISTRIBUTED__SCHEDULER__WORK_STEALING=False

有时一切顺利,有时计算()调用失败,出现以下异常......

发生异常的KilledWorker原因有很多。我们提供了一个关于常见案例的文档页面。

大多数情况下,我发现这是因为任务使用了比可用内存更多的内存,并且它被 OOM 杀手杀死了。

于 2020-05-26T09:15:41.067 回答