我正在使用文档中提供的示例中的内容
import dask.bag
from dask_kubernetes import KubeCluster
cluster = KubeCluster.from_yaml('worker-spec.yml')
cluster.adapt(minimum=0, maximum=24, interval="20000ms")
dag = dask.bag.from_sequence(tasks).map(lambda x: make_task(x).execute())
with distributed.Client(dask_cluster) as client:
results = dag.compute(scheduler=client)
cluster.close()
在我的例子中,这个execute()
函数做了很多 IO,运行大约需要 5-10 分钟。我想以KubeCluster
某种方式配置 and dask 调度程序,以最大限度地提高所有这些长期运行任务的顺利进行的机会。
我的问题有两个部分。首先,如何覆盖distributed
配置设置?我想尝试类似的东西
dask.config.set({'scheduler.work-stealing': False})
但我不知道设置它的正确位置是什么。具体来说,我不知道这是否是每个工作人员都应该知道的事情,或者我是否可以通过仅在实例化KubeCluster
.
我的问题的第二部分与对长时间运行(超过几分钟)的任务的建议有关。我一直在尝试使用默认设置。有时一切顺利,有时compute()
调用失败,但出现以下异常:
<... omitting caller from the traceback ...>
File "/usr/local/lib/python3.7/site-packages/dask/base.py", line 436, in compute
results = schedule(dsk, keys, **kwargs)
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 2587, in get
results = self.gather(packed, asynchronous=asynchronous, direct=direct)
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1885, in gather
asynchronous=asynchronous,
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 767, in sync
self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 345, in sync
raise exc.with_traceback(tb)
File "/usr/local/lib/python3.7/site-packages/distributed/utils.py", line 329, in f
result[0] = yield future
File "/usr/local/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
value = future.result()
File "/usr/local/lib/python3.7/site-packages/distributed/client.py", line 1741, in _gather
raise exception.with_traceback(traceback)
distributed.scheduler.KilledWorker: ("('lambda-364defe33868bf6e4864da2933065a12', 3)", <Worker 'tcp://172.18.7.71:39029', name: 9, memory: 0, processing: 4>)
我正在从 master 分支运行最近的提交:dask-kubernetes@git+git://github.com/dask/dask-kubernetes.git@add93d56ba1ac2f7d00576bd3f2d1be0db3e1757
.
编辑:
我更新了我的代码片段,以表明我正在调用该adapt()
函数,并将最小数量的工作人员设置为 0。我开始想知道是否达到 0 个工作人员可能会导致调度程序在它返回compute()
结果之前关闭。