我从多个 hdfs 文件创建了 dask 数据帧,然后尝试将最终数据帧写回 hdfs(parquet)。但它因内存错误消息而失败。
dask_df=<some initialization>
for parquet_hdfs_path in hdfs_files:
df=dd.read_parquet(parquet_hdfs_path)
dask_df = dask_df.join(df)
dask_df.to_parquet(...)
最终的 dask 数据框将有 100 万条记录和 600 列。
Dask 集群大小:5 个节点,每个节点具有 55G 内存。
例外:
File "/ebs/d1/agent/conda/envs/py361/lib/python3.6/site-packages/dask/dataframe/core.py", line 957, in to_parquet
[ worker demo4-dn-1 ] : return to_parquet(path, self, *args, **kwargs)
[ worker demo4-dn-1 ] : File "/ebs/d1/agent/conda/envs/py361/lib/python3.6/site-packages/dask/dataframe/io/parquet.py", line 445, in to_parquet
[ worker demo4-dn-1 ] : writes = delayed(writes).compute()
[ worker demo4-dn-1 ] : File "/ebs/d1/agent/conda/envs/py361/lib/python3.6/site-packages/dask/base.py", line 99, in compute
[ worker demo4-dn-1 ] : (result,) = compute(self, traverse=False, **kwargs)
[ worker demo4-dn-1 ] : File "/ebs/d1/agent/conda/envs/py361/lib/python3.6/site-packages/dask/base.py", line 206, in compute
[ worker demo4-dn-1 ] : results = get(dsk, keys, **kwargs)
[ worker demo4-dn-1 ] : File "/ebs/d1/agent/conda/envs/py361/lib/python3.6/site-packages/distributed/client.py", line 1934, in get
[ worker demo4-dn-1 ] : results = self.gather(packed, asynchronous=asynchronous)
[ worker demo4-dn-1 ] : File "/ebs/d1/agent/conda/envs/py361/lib/python3.6/site-packages/distributed/client.py", line 1376, in gather
[ worker demo4-dn-1 ] : asynchronous=asynchronous)
[ worker demo4-dn-1 ] : File "/ebs/d1/agent/conda/envs/py361/lib/python3.6/site-packages/distributed/client.py", line 548, in sync
[ worker demo4-dn-1 ] : return sync(self.loop, func, *args, **kwargs)
[ worker demo4-dn-1 ] : File "/ebs/d1/agent/conda/envs/py361/lib/python3.6/site-packages/distributed/utils.py", line 241, in sync
[ worker demo4-dn-1 ] : six.reraise(*error[0])
[ worker demo4-dn-1 ] : File "/ebs/d1/agent/conda/envs/py361/lib/python3.6/site-packages/six.py", line 693, in reraise
[ worker demo4-dn-1 ] : raise value
[ worker demo4-dn-1 ] : File "/ebs/d1/agent/conda/envs/py361/lib/python3.6/site-packages/distributed/utils.py", line 229, in f
[ worker demo4-dn-1 ] : result[0] = yield make_coro()
请指出我正确的方向来克服这个问题。