我正在使用带有 dask 的 python 3 来读取 parquet 文件列表,进行一些处理,然后将其全部放入一个新的联合 parquet 文件中以供以后使用。
该进程使用了太多内存,以至于它似乎在将所有 parquet 文件写入新 parquet 文件之前尝试将它们读入内存。
我正在使用以下代码
def t(path):
import dask.dataframe as dd
ddf = dd.read_parquet(path)
ddf["file"] = path
return ddf
b = bag.from_sequence(parquet_files)
with ProgressBar():
data = b.map(lambda x: t(x)).\
map(lambda y: dd.to_parquet(y, output_parquet_file, partition_on=["file"], append=True, engine="fastparquet")).\
compute(num_workers=1)
每次使用一个工作人员时,尤其是使用更多工作人员时,内存都会爆炸。这些文件很大(每个大约 1G),我试图从 csv 文件中读取信息并将它们分成 25M 块,并遇到了同样的问题。
我在这里想念什么?当迭代过程似乎在这里做正确的事情时,为什么它会尝试将所有内容加载到内存中?我怎样才能使用 dask 操作来做到这一点,而不会炸毁我在那台机器上的 128G 内存?
PS我尝试使用pyarrow引擎,但问题是附加尚未在dask中实现。
编辑:尝试了建议的解决方案:我现在试试这个代码
import dask.dataframe as dd
with ProgressBar():
dfs = [dd.read_parquet(pfile) for pfile in parquet_files]
for i, path in enumerate(parquet_files):
dfs[i]["file"] = path
df = dd.concat(dfs)
df.to_parquet(output_parquet_file)
尽管如此,内存还是会爆炸(在内存超过 200G 的系统上)