我在我的 dask worker 上运行一个简单的程序。下面是程序。
import numpy as np
from dask.distributed import Client
import joblib
from sklearn.datasets import load_digits
from sklearn.model_selection import RandomizedSearchCV
from sklearn.svm import SVC
client = Client('127.0.0.1:30006', timeout=10000)
client.get_versions(check=True)
import pandas as pd
digits = load_digits()
param_space = {
'C': np.logspace(-6, 6, 13),
'gamma': np.logspace(-8, 8, 17),
'tol': np.logspace(-4, -1, 4),
'class_weight': [None, 'balanced'],
}
model = SVC(kernel='rbf')
search = RandomizedSearchCV(model, param_space, cv=3, n_iter=50, verbose=10)
with joblib.parallel_backend('dask'): #Running it on dask worker
search.fit(digits.data, digits.target)
30006 是我运行调度程序的端口。
我收到以下错误。
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x000001DC4E701850>>, <Task finished name='Task-42' coro=<DaskDistributedBackend.apply_async.<locals>.f() done, defined at C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py:316> exception=CommClosedError('in <closed TCP>: Stream is closed')>)
Traceback (most recent call last):
File "C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\distributed\comm\tcp.py", line 187, in read
n_frames = await stream.read_bytes(8)
tornado.iostream.StreamClosedError: Stream is closed
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\tornado\ioloop.py", line 741, in _run_callback
ret = callback()
File "C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\tornado\ioloop.py", line 765, in _discard_future_result
future.result()
File "C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 317, in f
batch, tasks = await self._to_func_args(func)
File "C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 306, in _to_func_args
await maybe_to_futures(kwargs.values())))
File "C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 289, in maybe_to_futures
[f] = await self.client.scatter(
File "C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\distributed\client.py", line 2084, in _scatter
await self.scheduler.scatter(
File "C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\distributed\core.py", line 852, in send_recv_from_rpc
result = await send_recv(comm=comm, op=key, **kwargs)
File "C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\distributed\core.py", line 635, in send_recv
response = await comm.read(deserializers=deserializers)
File "C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\distributed\comm\tcp.py", line 202, in read
convert_stream_closed_error(self, e)
File "C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\distributed\comm\tcp.py", line 126, in convert_stream_closed_error
raise CommClosedError("in %s: %s" % (obj, exc)) from exc
distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x000001DC4E701850>>, <Task finished name='Task-44' coro=<DaskDistributedBackend.apply_async.<locals>.f() done, defined at C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py:316> exception=CommClosedError('in <closed TCP>: Stream is closed')>)
Traceback (most recent call last):
File "C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\distributed\comm\tcp.py", line 187, in read
n_frames = await stream.read_bytes(8)
tornado.iostream.StreamClosedError: Stream is closed
以下是我的包裹信息:
{
"scheduler": {
"host": {
"python": "3.8.0.final.0",
"python-bits": 64,
"OS": "Linux",
"OS-release": "5.4.72-microsoft-standard-WSL2",
"machine": "x86_64",
"processor": "",
"byteorder": "little",
"LC_ALL": "C.UTF-8",
"LANG": "C.UTF-8"
},
"packages": {
"python": "3.8.0.final.0",
"dask": "2021.01.0",
"distributed": "2021.01.0",
"msgpack": "1.0.0",
"cloudpickle": "1.6.0",
"tornado": "6.1",
"toolz": "0.11.1",
"numpy": "1.18.1",
"lz4": "3.1.1",
"blosc": "1.9.2"
}
},
"workers": {
"tcp://10.1.1.92:37435": {
"host": {
"python": "3.8.0.final.0",
"python-bits": 64,
"OS": "Linux",
"OS-release": "5.4.72-microsoft-standard-WSL2",
"machine": "x86_64",
"processor": "",
"byteorder": "little",
"LC_ALL": "C.UTF-8",
"LANG": "C.UTF-8"
},
"packages": {
"python": "3.8.0.final.0",
"dask": "2021.01.0",
"distributed": "2021.01.0",
"msgpack": "1.0.0",
"cloudpickle": "1.6.0",
"tornado": "6.1",
"toolz": "0.11.1",
"numpy": "1.18.1",
"lz4": "3.1.1",
"blosc": "1.9.2"
}
},
"tcp://10.1.1.93:45855": {
"host": {
"python": "3.8.0.final.0",
"python-bits": 64,
"OS": "Linux",
"OS-release": "5.4.72-microsoft-standard-WSL2",
"machine": "x86_64",
"processor": "",
"byteorder": "little",
"LC_ALL": "C.UTF-8",
"LANG": "C.UTF-8"
},
"packages": {
"python": "3.8.0.final.0",
"dask": "2021.01.0",
"distributed": "2021.01.0",
"msgpack": "1.0.0",
"cloudpickle": "1.6.0",
"tornado": "6.1",
"toolz": "0.11.1",
"numpy": "1.18.1",
"lz4": "3.1.1",
"blosc": "1.9.2"
}
},
"tcp://10.1.1.94:36523": {
"host": {
"python": "3.8.0.final.0",
"python-bits": 64,
"OS": "Linux",
"OS-release": "5.4.72-microsoft-standard-WSL2",
"machine": "x86_64",
"processor": "",
"byteorder": "little",
"LC_ALL": "C.UTF-8",
"LANG": "C.UTF-8"
},
"packages": {
"python": "3.8.0.final.0",
"dask": "2021.01.0",
"distributed": "2021.01.0",
"msgpack": "1.0.0",
"cloudpickle": "1.6.0",
"tornado": "6.1",
"toolz": "0.11.1",
"numpy": "1.18.1",
"lz4": "3.1.1",
"blosc": "1.9.2"
}
}
},
"client": {
"host": {
"python": "3.8.6.final.0",
"python-bits": 64,
"OS": "Windows",
"OS-release": "10",
"machine": "AMD64",
"processor": "Intel64 Family 6 Model 142 Stepping 10, GenuineIntel",
"byteorder": "little",
"LC_ALL": "None",
"LANG": "None"
},
"packages": {
"python": "3.8.6.final.0",
"dask": "2021.01.0",
"distributed": "2021.01.0",
"msgpack": "1.0.0",
"cloudpickle": "1.6.0",
"tornado": "6.1",
"toolz": "0.11.1",
"numpy": "1.18.1",
"lz4": "None",
"blosc": "None"
}
}
}
我怀疑问题出在 joblib 上,因为如果我在没有“with joblib.parallel_backend('dask'):”行的情况下运行它,那么 fit 命令可以正常工作。另外,我在 dask worker 上尝试了一个简单的 numpy 数组计算,它可以工作。所以 dask 工作人员和我客户的连接效果很好。我尝试过不同版本的joblib。(0.16.0. 0.17.0, 1.0.0, 1.0.1) 并且同样的错误仍然存在。