1

我从多个 hdfs 文件创建了 dask 数据帧,然后尝试将最终数据帧写回 hdfs(parquet)。但它因内存错误消息而失败。

dask_df=<some initialization>
for parquet_hdfs_path in hdfs_files:
    df=dd.read_parquet(parquet_hdfs_path)
    dask_df = dask_df.join(df)

dask_df.to_parquet(...)

最终的 dask 数据框将有 100 万条记录和 600 列。

Dask 集群大小:5 个节点,每个节点具有 55G 内存。

例外:

File "/ebs/d1/agent/conda/envs/py361/lib/python3.6/site-packages/dask/dataframe/core.py", line 957, in to_parquet
[ worker demo4-dn-1 ] :     return to_parquet(path, self, *args, **kwargs)
[ worker demo4-dn-1 ] :   File "/ebs/d1/agent/conda/envs/py361/lib/python3.6/site-packages/dask/dataframe/io/parquet.py", line 445, in to_parquet
[ worker demo4-dn-1 ] :     writes = delayed(writes).compute()
[ worker demo4-dn-1 ] :   File "/ebs/d1/agent/conda/envs/py361/lib/python3.6/site-packages/dask/base.py", line 99, in compute
[ worker demo4-dn-1 ] :     (result,) = compute(self, traverse=False, **kwargs)
[ worker demo4-dn-1 ] :   File "/ebs/d1/agent/conda/envs/py361/lib/python3.6/site-packages/dask/base.py", line 206, in compute
[ worker demo4-dn-1 ] :     results = get(dsk, keys, **kwargs)
[ worker demo4-dn-1 ] :   File "/ebs/d1/agent/conda/envs/py361/lib/python3.6/site-packages/distributed/client.py", line 1934, in get
[ worker demo4-dn-1 ] :     results = self.gather(packed, asynchronous=asynchronous)
[ worker demo4-dn-1 ] :   File "/ebs/d1/agent/conda/envs/py361/lib/python3.6/site-packages/distributed/client.py", line 1376, in gather
[ worker demo4-dn-1 ] :     asynchronous=asynchronous)
[ worker demo4-dn-1 ] :   File "/ebs/d1/agent/conda/envs/py361/lib/python3.6/site-packages/distributed/client.py", line 548, in sync
[ worker demo4-dn-1 ] :     return sync(self.loop, func, *args, **kwargs)
[ worker demo4-dn-1 ] :   File "/ebs/d1/agent/conda/envs/py361/lib/python3.6/site-packages/distributed/utils.py", line 241, in sync
[ worker demo4-dn-1 ] :     six.reraise(*error[0])
[ worker demo4-dn-1 ] :   File "/ebs/d1/agent/conda/envs/py361/lib/python3.6/site-packages/six.py", line 693, in reraise
[ worker demo4-dn-1 ] :     raise value
[ worker demo4-dn-1 ] :   File "/ebs/d1/agent/conda/envs/py361/lib/python3.6/site-packages/distributed/utils.py", line 229, in f
[ worker demo4-dn-1 ] :     result[0] = yield make_coro()

请指出我正确的方向来克服这个问题。

4

0 回答 0