2

在分布式作业上运行 Dask 时,调度程序出现以下错误:

distributed.core - ERROR -
Traceback (most recent call last):
  File "/usr/local/lib/python3.4/dist-packages/distributed/core.py", line 269, in write
    frames = protocol.dumps(msg)
  File "/usr/local/lib/python3.4/dist-packages/distributed/protocol.py", line 81, in dumps
    frames = dumps_msgpack(small)
  File "/usr/local/lib/python3.4/dist-packages/distributed/protocol.py", line 153, in dumps_msgpack
    payload = msgpack.dumps(msg, use_bin_type=True)
  File "/usr/local/lib/python3.4/dist-packages/msgpack/__init__.py", line 47, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 231, in msgpack._packer.Packer.pack (msgpack/_packer.cpp:231)
  File "msgpack/_packer.pyx", line 239, in msgpack._packer.Packer.pack (msgpack/_packer.cpp:239)
MemoryError

这是调度程序或其中一名工作人员的内存不足吗?或两者??

4

1 回答 1

2

此错误的最常见原因是尝试收集太多数据,例如在以下使用 dask.dataframe 的示例中发生:

df = dd.read_csv('s3://bucket/lots-of-data-*.csv')
df.compute()

这会将所有数据加载到整个集群的 RAM 中(这很好),然后尝试通过调度程序将整个结果带回本地机器(这可能无法处理全部 100 GB 的数据)一个地方。)工作人员到客户端的通信通过调度程序,因此它是第一台接收所有数据的单机,并且第一台可能发生故障的机器。

如果是这种情况,那么您可能希望使用该Executor.persist方法来触发计算,但将其保留在集群上。

df = dd.read_csv('s3://bucket/lots-of-data-*.csv')
df = e.persist(df)

通常我们只df.compute()用于我们想在本地会话中查看的小结果。

于 2016-07-23T13:43:39.450 回答