我正在尝试对使用大量工作人员的工作人员的大文件进行基本的ETLdask-cudf
工作流程。
问题:
最初,scheduler
计划在工作人员之间读取的数量相等,partitions
但在预处理期间,它倾向于在工作人员之间分配/打乱它们。
一个工作人员获得的最小分区数是4
,它获得的最大分区数是19
(total partitions
= apprx. 300
,num_workers
= 22
)这种行为会导致下游问题,因为我希望在工作人员之间平均分配分区。
有没有办法防止这种行为?
我认为下面会对此有所帮助,但事实并非如此。
# limit work-stealing as much as possible
dask.config.set({'distributed.scheduler.work-stealing': False})
dask.config.set({'distributed.scheduler.bandwidth': 1})
正在完成的工作流程:
- 读
- 填满
- 向下转换/其他逻辑
df = dask_cudf.read_csv(path = `big_files`,
names = names,
delimiter='\t',
dtype = read_dtype_ls,
chunksize=chunksize)
df = df.map_partitions(lambda df:df.fillna(-1))
def transform_col_int64_to_int32(df, columns):
"""
This function casts int64s columns to int32s
we are using this to transform int64s to int32s and overflows seem to be consitent
"""
for col in columns:
df[col] = df[col].astype(np.int32)
return df
df = df.map_partitions(transform_col_int64_to_int32,cat_col_names)
df = df.persist()