我是 Dask 的新手,并使用行组将 pandas Dataframe 导出到 Parquet:
x.to_parquet(path + 'ohlcv_TRX-PERP_978627_rowgrouped.prq', row_group_size=1000)
然后我尝试用 Dask 加载它,这似乎工作正常(?):
x = dd.read_parquet(path + 'ohlcv_TRX-PERP_978627_rowgrouped.prq')
x
# Note: The dataframe has almost 2000 columns, I clipped them for here
Dask DataFrame Structure:
open h
npartitions=978
2019-07-21 23:55:00 float64 floa
2019-07-22 16:35:00 ...
... ...
2021-05-30 17:06:00 ...
2021-05-31 03:32:00 ...
Dask Name: read-parquet, 978 tasks
到目前为止,没有任何问题。但是当我调用x.max().compute()
它时,Dask 似乎将整个数据集加载到 RAM 中(至少 RAM 像疯了一样增加)然后崩溃。只看max()
:
x = x.max()
x
Dask Series Structure:
npartitions=1
ACCBL_10 float64
volume ...
dtype: float64
Dask Name: dataframe-max-agg, 1957 tasks
根据 Dask 教程https://tutorial.dask.org/04_dataframe.html#Computations-with-dask.dataframe ,据我了解,这应该可以正常工作(?)
max()
当我尝试只调用一列时,它也会内存不足:
x.open.max().compute()
我做错了什么,还是它应该如何工作,我必须做一些不同的事情?
我现在还尝试使用distributed
系统并将客户端限制为 10GB,但 Dask 又吃掉了 24GB 的 RAM,并且只打印了一个警告,表明工作组完全超出了设置的内存限制:
if __name__ == '__main__':
client = Client(processes=False, memory_limit='5GB')
x = dd.read_parquet(path + 'ohlcv_TRX-PERP_978627_rowgrouped.prq')
print(x)
s = x.max().compute()
print(s)
distributed.worker - WARNING - Memory use is high but worker has no data to store to disk. Perhaps some other process is leaking memory? Process memory: 24.13 GB -- Worker memory limit: 5.00 GB