2

我正在使用带有 dask 的 python 3 来读取 parquet 文件列表,进行一些处理,然后将其全部放入一个新的联合 parquet 文件中以供以后使用。

该进程使用了​​太多内存,以至于它似乎在将所有 parquet 文件写入新 parquet 文件之前尝试将它们读入内存。

我正在使用以下代码

def t(path):
    import dask.dataframe as dd
    ddf = dd.read_parquet(path)
    ddf["file"] = path
    return ddf

b = bag.from_sequence(parquet_files)
with ProgressBar():
       data = b.map(lambda x: t(x)).\
              map(lambda y: dd.to_parquet(y, output_parquet_file, partition_on=["file"], append=True, engine="fastparquet")).\
           compute(num_workers=1)

每次使用一个工作人员时,尤其是使用更多工作人员时,内存都会爆炸。这些文件很大(每个大约 1G),我试图从 csv 文件中读取信息并将它们分成 25M 块,并遇到了同样的问题。

我在这里想念什么?当迭代过程似乎在这里做正确的事情时,为什么它会尝试将所有内容加载到内存中?我怎样才能使用 dask 操作来做到这一点,而不会炸毁我在那台机器上的 128G 内存?

PS我尝试使用pyarrow引擎,但问题是附加尚未在dask中实现。

编辑:尝试了建议的解决方案:我现在试试这个代码

import dask.dataframe as dd
with ProgressBar():
    dfs = [dd.read_parquet(pfile) for pfile in parquet_files]
    for i, path in enumerate(parquet_files):
        dfs[i]["file"] = path
    df = dd.concat(dfs)
    df.to_parquet(output_parquet_file)

尽管如此,内存还是会爆炸(在内存超过 200G 的系统上)

4

2 回答 2

2

在另一个集合的地图中使用 dask 集合方法很奇怪。您可以bag.map像这样使用并直接调用 fastaprquet 函数,或者更好(取决于您需要执行的处理),对所有内容使用数据帧 API:

dfs = [dd.read_parquet(pfile, ...) for pfile in parquet_files]
df = dd.concat(dfs)
df.to_parquet(...)

请注意,尽管您尝试附加到单个文件(我认为),但 parquet 格式并没有真正从中受益,您也可以让 Dask 每个分区写入一个文件。

于 2019-08-04T19:40:28.383 回答
0

dask 支持将多个 parquet 文件作为分区读取。直接调用就行了。

import dask.dataframe as dd
ddf = dd.read_parquet("parquets/*.parquet")
ddf = ddf.map_partitions(lambda df: df*2)
ddf.to_parquet("result.parquet")
于 2021-06-11T02:23:21.103 回答