1

我是 DASK 的新手,想测试在集群上运行 DASK。集群有一个头服务器和几个其他节点。登录头服务器后,我可以通过简单的 ssh 无需密码即可进入其他节点。我想运行一个简单的函数来迭代一个大数组。该函数定义如下。就是将 dt64 转换为 numpy datetime 对象。

import xarray as xr import numpy as np from dask import compute, delayed import dask.multiprocessing from datetime import datetime, timedelta def converdt64(dt64): ts = (dt64 - np.datetime64('1970-01-01T00:00:00Z')) / np.timedelta64(1, 's') return datetime.utcfromtimestamp(ts)

然后在终端上,我通过应用这个函数来迭代一个大小为 N 的 1D 数组。

values = [delayed(convertdt64)(x) for x in arraydata] results1 = compute(*values,scheduler='processes’)

这在头服务器上使用了一些内核,并且它可以工作,尽管速度很慢。然后我尝试使用客户端在集群的几个节点上启动该功能,如下所示:

from dask.distributed import Client client = Client("10.140.251.254:8786 ») results = compute(*values, scheduler='distributed’)

它根本不起作用。有一些警告和一条错误消息,如下所示。

distributed.comm.tcp - WARNING - Could not set timeout on TCP stream: [Errno 92] Protocol not available
distributed.comm.tcp - WARNING - Closing dangling stream in <TCP local=tcp://10.140.251.254:57257 remote=tcp://10.140.251.254:8786>

CancelledError: convertdt64-0205ad5e-214b-4683-b5c4-b6a2a6d8e52f

我也尝试了 dask.bag 并且收到了相同的错误消息。集群上的并行计算不起作用的原因可能是什么?是由于某些服务器/网络配置,还是我对 DASK 客户端的使用不正确?在此先感谢您的帮助 !

最好的祝愿

香农 X

4

1 回答 1

0

...然后我尝试使用客户端在集群的多个节点上启动该功能,如下所示:

我在尝试在调度程序上运行任务时遇到了类似的问题。节点连接得很好。但是,尝试提交任务会导致取消。

记录的示例要么是本地的,要么来自与调度程序相同的节点。当我将客户端移动到调度程序节点时,问题就消失了。

于 2018-12-02T03:31:40.093 回答