1

我正在使用以下代码将 csv 文件加载到 dask cudf 中,然后为 xgboost 创建一个产生错误的 devicequantilematrix:

cluster = LocalCUDACluster(rmm_pool_size=parse_bytes("9GB"), n_workers=5, threads_per_worker=1)
client = Client(cluster)

ddb = dask_cudf.read_csv('/home/ubuntu/dataset.csv')
xTrain = ddb.iloc[:,20:]
yTrain = ddb.iloc[:,1:2]

dTrain = xgb.dask.DaskDeviceQuantileDMatrix(client=client, data=xTrain, label=yTrain)

错误:

---------------------------------------------------------------------------
AssertionError                            Traceback (most recent call last)
<ipython-input-16-2cca13ac807f> in <module>
----> 1 dTrain = xgb.dask.DaskDeviceQuantileDMatrix(client=client, data=xTrain, label=yTrain)

/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/xgboost/dask.py in __init__(self, client, data, label, missing, weight, base_margin, label_lower_bound, label_upper_bound, feature_names, feature_types, max_bin)
    508                          label_upper_bound=label_upper_bound,
    509                          feature_names=feature_names,
--> 510                          feature_types=feature_types)
    511         self.max_bin = max_bin
    512         self.is_quantile = True

/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/xgboost/dask.py in __init__(self, client, data, label, missing, weight, base_margin, label_lower_bound, label_upper_bound, feature_names, feature_types)
    229                                  base_margin=base_margin,
    230                                  label_lower_bound=label_lower_bound,
--> 231                                  label_upper_bound=label_upper_bound)
    232 
    233     def __await__(self):

/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    835         else:
    836             return sync(
--> 837                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    838             )
    839 

/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    338     if error[0]:
    339         typ, exc, tb = error[0]
--> 340         raise exc.with_traceback(tb)
    341     else:
    342         return result[0]

/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/utils.py in f()
    322             if callback_timeout is not None:
    323                 future = asyncio.wait_for(future, callback_timeout)
--> 324             result[0] = yield future
    325         except Exception as exc:
    326             error[0] = sys.exc_info()

/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/xgboost/dask.py in map_local_data(self, client, data, label, weights, base_margin, label_lower_bound, label_upper_bound)
    311 
    312         for part in parts:
--> 313             assert part.status == 'finished'
    314 
    315         # Preserving the partition order for prediction.

AssertionError: 

我不知道这个错误是由什么引起的,因为它除了“断言错误”之外什么也没说。我有一个太大的数据集,无法读入单个 GPU,因此当我从磁盘读取数据时,我使用 dask_cudf 将其拆分,然后将其直接输入 XGBoost 所需的数据结构。我不确定它是 dask_cudf 问题还是 XGBoost 问题。

当我坚持使用“等待”时出现新错误:

distributed.core - ERROR - 2154341415 exceeds max_bin_len(2147483647)
Traceback (most recent call last):
  File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/core.py", line 563, in handle_stream
    handler(**merge(extra, msg))
  File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/scheduler.py", line 2382, in update_graph_hlg
    dsk, dependencies, annotations = highlevelgraph_unpack(hlg)
  File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/protocol/highlevelgraph.py", line 161, in highlevelgraph_unpack
    hlg = loads_msgpack(*dumped_hlg)
  File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/protocol/core.py", line 223, in loads_msgpack
    payload, object_hook=msgpack_decode_default, use_list=False, **msgpack_opts
  File "msgpack/_unpacker.pyx", line 195, in msgpack._cmsgpack.unpackb
ValueError: 2154341415 exceeds max_bin_len(2147483647)
tornado.application - ERROR - Exception in callback <bound method Client._heartbeat of <Client: 'tcp://127.0.0.1:43507' processes=4 threads=4, memory=49.45 GB>>
Traceback (most recent call last):
  File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/tornado/ioloop.py", line 905, in _run
    return self.callback()
  File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/client.py", line 1177, in _heartbeat
    self.scheduler_comm.send({"op": "heartbeat-client"})
  File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/batched.py", line 136, in send
    raise CommClosedError
distributed.comm.core.CommClosedError
distributed.core - ERROR - Exception while handling op register-client
Traceback (most recent call last):
  File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/core.py", line 491, in handle_comm
    result = await result
  File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/scheduler.py", line 3247, in add_client
    await self.handle_stream(comm=comm, extra={"client": client})
  File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/core.py", line 563, in handle_stream
    handler(**merge(extra, msg))
  File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/scheduler.py", line 2382, in update_graph_hlg
    dsk, dependencies, annotations = highlevelgraph_unpack(hlg)
  File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/protocol/highlevelgraph.py", line 161, in highlevelgraph_unpack
    hlg = loads_msgpack(*dumped_hlg)
  File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/protocol/core.py", line 223, in loads_msgpack
    payload, object_hook=msgpack_decode_default, use_list=False, **msgpack_opts
  File "msgpack/_unpacker.pyx", line 195, in msgpack._cmsgpack.unpackb
ValueError: 2154341415 exceeds max_bin_len(2147483647)
tornado.application - ERROR - Exception in callback functools.partial(<function TCPServer._handle_connection.<locals>.<lambda> at 0x7f7058e87f80>, <Task finished coro=<BaseTCPListener._handle_stream() done, defined at /usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/comm/tcp.py:459> exception=ValueError('2154341415 exceeds max_bin_len(2147483647)')>)
Traceback (most recent call last):
  File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/tornado/ioloop.py", line 741, in _run_callback
    ret = callback()
  File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/tornado/tcpserver.py", line 331, in <lambda>
    gen.convert_yielded(future), lambda f: f.result()
  File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/comm/tcp.py", line 476, in _handle_stream
    await self.comm_handler(comm)
  File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/core.py", line 491, in handle_comm
    result = await result
  File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/scheduler.py", line 3247, in add_client
    await self.handle_stream(comm=comm, extra={"client": client})
  File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/core.py", line 563, in handle_stream
    handler(**merge(extra, msg))
  File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/scheduler.py", line 2382, in update_graph_hlg
    dsk, dependencies, annotations = highlevelgraph_unpack(hlg)
  File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/protocol/highlevelgraph.py", line 161, in highlevelgraph_unpack
    hlg = loads_msgpack(*dumped_hlg)
  File "/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/protocol/core.py", line 223, in loads_msgpack
    payload, object_hook=msgpack_decode_default, use_list=False, **msgpack_opts
  File "msgpack/_unpacker.pyx", line 195, in msgpack._cmsgpack.unpackb
ValueError: 2154341415 exceeds max_bin_len(2147483647)
---------------------------------------------------------------------------
CancelledError                            Traceback (most recent call last)
<ipython-input-9-e2b8073da6e7> in <module>
      1 from dask.distributed import wait
----> 2 wait([xTrainDC,yTrainDC])

/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/client.py in wait(fs, timeout, return_when)
   4257     """
   4258     client = default_client()
-> 4259     result = client.sync(_wait, fs, timeout=timeout, return_when=return_when)
   4260     return result
   4261 

/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    835         else:
    836             return sync(
--> 837                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    838             )
    839 

/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    338     if error[0]:
    339         typ, exc, tb = error[0]
--> 340         raise exc.with_traceback(tb)
    341     else:
    342         return result[0]

/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/distributed/utils.py in f()
    322             if callback_timeout is not None:
    323                 future = asyncio.wait_for(future, callback_timeout)
--> 324             result[0] = yield future
    325         except Exception as exc:
    326             error[0] = sys.exc_info()

/usr/local/share/anaconda3/envs/rapidsai/lib/python3.7/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

CancelledError: 
4

1 回答 1

0

我猜这是dask_cudf.read_csv('/home/ubuntu/dataset.csv')失败的原因,导致潜在的未来状态不是finished. CSV 是否适合您正在使用的 GPU 的 GPU 内存?您可以尝试以下代码并报告错误消息吗?

这将告诉 dask 计算read_csvandiloc函数的结果并等待分发的结果完成,然后再继续创建DMatrix.

from dask.distributed import wait

cluster = LocalCUDACluster(rmm_pool_size=parse_bytes("9GB"), n_workers=5, threads_per_worker=1)
client = Client(cluster)

ddb = dask_cudf.read_csv('/home/ubuntu/dataset.csv')
xTrain = ddb.iloc[:,20:].persist()
yTrain = ddb.iloc[:,1:2].persist()
wait([xTrain, yTrain])

dTrain = xgb.dask.DaskDeviceQuantileDMatrix(client=client, data=xTrain, label=yTrain)
于 2021-01-21T16:29:42.493 回答