我正在尝试在分布式系统上执行自定义 dask 图,问题是它似乎没有释放已完成任务的内存。难道我做错了什么?
我尝试过更改进程数并使用本地集群,但似乎没有什么不同。
from dask.distributed import Client
from dask import get
import pandas as pd
client = Client()
def get_head(df):
return df.head()
process_big_file_tasks = {f'process-big-file-{i}': (pd.read_csv, '/home/ubuntu/huge_file.csv') for i in range(50)}
return_fragment_tasks = {f'return-fragment-{i}': (get_head, previous_task) for i, previous_task in enumerate(process_big_file_tasks)}
dsk = {
**process_big_file_tasks,
**return_fragment_tasks,
'concat': (pd.concat, list(return_fragment_tasks))
}
client.get(dsk, 'concat')
由于任务是相互独立的(或者至少是那些消耗最多内存的任务),所以当每个任务完成时,应该释放它的内存。