0

我在我的 dask worker 上运行一个简单的程序。下面是程序。

import numpy as np
from dask.distributed import Client


import joblib
from sklearn.datasets import load_digits
from sklearn.model_selection import RandomizedSearchCV
from sklearn.svm import SVC

client = Client('127.0.0.1:30006', timeout=10000)
client.get_versions(check=True)
import pandas as pd
digits = load_digits()

param_space = {
    'C': np.logspace(-6, 6, 13),
    'gamma': np.logspace(-8, 8, 17),
    'tol': np.logspace(-4, -1, 4),
    'class_weight': [None, 'balanced'],
}

model = SVC(kernel='rbf')
search = RandomizedSearchCV(model, param_space, cv=3, n_iter=50, verbose=10)


with joblib.parallel_backend('dask'): #Running it on dask worker
    search.fit(digits.data, digits.target)

30006 是我运行调度程序的端口。

我收到以下错误。

tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x000001DC4E701850>>, <Task finished name='Task-42' coro=<DaskDistributedBackend.apply_async.<locals>.f() done, defined at C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py:316> exception=CommClosedError('in <closed TCP>: Stream is closed')>)
Traceback (most recent call last):
  File "C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\distributed\comm\tcp.py", line 187, in read
    n_frames = await stream.read_bytes(8)
tornado.iostream.StreamClosedError: Stream is closed

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\tornado\ioloop.py", line 741, in _run_callback
    ret = callback()
  File "C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\tornado\ioloop.py", line 765, in _discard_future_result
    future.result()
  File "C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 317, in f
    batch, tasks = await self._to_func_args(func)
  File "C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 306, in _to_func_args
    await maybe_to_futures(kwargs.values())))
  File "C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py", line 289, in maybe_to_futures
    [f] = await self.client.scatter(
  File "C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\distributed\client.py", line 2084, in _scatter
    await self.scheduler.scatter(
  File "C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\distributed\core.py", line 852, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\distributed\core.py", line 635, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\distributed\comm\tcp.py", line 202, in read
    convert_stream_closed_error(self, e)
  File "C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\distributed\comm\tcp.py", line 126, in convert_stream_closed_error
    raise CommClosedError("in %s: %s" % (obj, exc)) from exc
distributed.comm.core.CommClosedError: in <closed TCP>: Stream is closed
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOLoop object at 0x000001DC4E701850>>, <Task finished name='Task-44' coro=<DaskDistributedBackend.apply_async.<locals>.f() done, defined at C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\joblib\_dask.py:316> exception=CommClosedError('in <closed TCP>: Stream is closed')>)
Traceback (most recent call last):
  File "C:\Users\User\Documents\code\condavirtualenv\lib\site-packages\distributed\comm\tcp.py", line 187, in read
    n_frames = await stream.read_bytes(8)
tornado.iostream.StreamClosedError: Stream is closed

以下是我的包裹信息:

{
    "scheduler": {
        "host": {
            "python": "3.8.0.final.0",
            "python-bits": 64,
            "OS": "Linux",
            "OS-release": "5.4.72-microsoft-standard-WSL2",
            "machine": "x86_64",
            "processor": "",
            "byteorder": "little",
            "LC_ALL": "C.UTF-8",
            "LANG": "C.UTF-8"
        },
        "packages": {
            "python": "3.8.0.final.0",
            "dask": "2021.01.0",
            "distributed": "2021.01.0",
            "msgpack": "1.0.0",
            "cloudpickle": "1.6.0",
            "tornado": "6.1",
            "toolz": "0.11.1",
            "numpy": "1.18.1",
            "lz4": "3.1.1",
            "blosc": "1.9.2"
        }
    },
    "workers": {
        "tcp://10.1.1.92:37435": {
            "host": {
                "python": "3.8.0.final.0",
                "python-bits": 64,
                "OS": "Linux",
                "OS-release": "5.4.72-microsoft-standard-WSL2",
                "machine": "x86_64",
                "processor": "",
                "byteorder": "little",
                "LC_ALL": "C.UTF-8",
                "LANG": "C.UTF-8"
            },
            "packages": {
                "python": "3.8.0.final.0",
                "dask": "2021.01.0",
                "distributed": "2021.01.0",
                "msgpack": "1.0.0",
                "cloudpickle": "1.6.0",
                "tornado": "6.1",
                "toolz": "0.11.1",
                "numpy": "1.18.1",
                "lz4": "3.1.1",
                "blosc": "1.9.2"
            }
        },
        "tcp://10.1.1.93:45855": {
            "host": {
                "python": "3.8.0.final.0",
                "python-bits": 64,
                "OS": "Linux",
                "OS-release": "5.4.72-microsoft-standard-WSL2",
                "machine": "x86_64",
                "processor": "",
                "byteorder": "little",
                "LC_ALL": "C.UTF-8",
                "LANG": "C.UTF-8"
            },
            "packages": {
                "python": "3.8.0.final.0",
                "dask": "2021.01.0",
                "distributed": "2021.01.0",
                "msgpack": "1.0.0",
                "cloudpickle": "1.6.0",
                "tornado": "6.1",
                "toolz": "0.11.1",
                "numpy": "1.18.1",
                "lz4": "3.1.1",
                "blosc": "1.9.2"
            }
        },
        "tcp://10.1.1.94:36523": {
            "host": {
                "python": "3.8.0.final.0",
                "python-bits": 64,
                "OS": "Linux",
                "OS-release": "5.4.72-microsoft-standard-WSL2",
                "machine": "x86_64",
                "processor": "",
                "byteorder": "little",
                "LC_ALL": "C.UTF-8",
                "LANG": "C.UTF-8"
            },
            "packages": {
                "python": "3.8.0.final.0",
                "dask": "2021.01.0",
                "distributed": "2021.01.0",
                "msgpack": "1.0.0",
                "cloudpickle": "1.6.0",
                "tornado": "6.1",
                "toolz": "0.11.1",
                "numpy": "1.18.1",
                "lz4": "3.1.1",
                "blosc": "1.9.2"
            }
        }
    },
    "client": {
        "host": {
            "python": "3.8.6.final.0",
            "python-bits": 64,
            "OS": "Windows",
            "OS-release": "10",
            "machine": "AMD64",
            "processor": "Intel64 Family 6 Model 142 Stepping 10, GenuineIntel",
            "byteorder": "little",
            "LC_ALL": "None",
            "LANG": "None"
        },
        "packages": {
            "python": "3.8.6.final.0",
            "dask": "2021.01.0",
            "distributed": "2021.01.0",
            "msgpack": "1.0.0",
            "cloudpickle": "1.6.0",
            "tornado": "6.1",
            "toolz": "0.11.1",
            "numpy": "1.18.1",
            "lz4": "None",
            "blosc": "None"
        }
    }
}

我怀疑问题出在 joblib 上,因为如果我在没有“with joblib.parallel_backend('dask'):”行的情况下运行它,那么 fit 命令可以正常工作。另外,我在 dask worker 上尝试了一个简单的 numpy 数组计算,它可以工作。所以 dask 工作人员和我客户的连接效果很好。我尝试过不同版本的joblib。(0.16.0. 0.17.0, 1.0.0, 1.0.1) 并且同样的错误仍然存​​在。

4

1 回答 1

0

问题在于工作人员和客户端中运行的库版本不同。我从工作人员那里做了一个 pip 列表,并在客户端 Docker 文件上安装了所有具有特定版本的库。现在它正在工作。我能够适应 dask 工人

于 2021-04-07T11:43:32.450 回答