1

我正在尝试对使用大量工作人员的工作人员的大文件进行基本的ETLdask-cudf工作流程。

问题:

最初,scheduler计划在工作人员之间读取的数量相等,partitions但在预处理期间,它倾向于在工作人员之间分配/打乱它们。

一个工作人员获得的最小分区数是4,它获得的最大分区数是19total partitions= apprx. 300num_workers= 22)这种行为会导致下游问题,因为我希望在工作人员之间平均分配分区。

有没有办法防止这种行为?

我认为下面会对此有所帮助,但事实并非如此。

# limit work-stealing as much as possible
dask.config.set({'distributed.scheduler.work-stealing': False})
dask.config.set({'distributed.scheduler.bandwidth': 1})

正在完成的工作流程:

  • 填满
  • 向下转换/其他逻辑

df = dask_cudf.read_csv(path = `big_files`,
                        names = names,
                        delimiter='\t',
                        dtype = read_dtype_ls,
                        chunksize=chunksize)


df = df.map_partitions(lambda df:df.fillna(-1))

def transform_col_int64_to_int32(df, columns):
    """
        This function casts int64s columns to int32s 
        we are using this to transform int64s to int32s and overflows seem to be consitent
    """
    for col in columns:
        df[col] = df[col].astype(np.int32)
    return df

df = df.map_partitions(transform_col_int64_to_int32,cat_col_names)
df = df.persist()

4

1 回答 1

1

Dask 计划基于多种因素运行任务,包括数据依赖关系、运行时、内存使用等。通常,这些问题的答案是“让它做它的事”。调度不佳的最常见原因是块太少。

但是,如果您明确需要更平衡的分布,那么您可以尝试Client.rebalance方法。

wait(df)
client.rebalance(df)

但是请注意,再平衡不像其他 Dask 操作那样健壮。最好在没有大量其他工作正在进行的时候进行(因此调用dask.distributed.wait上面的方法)。

另外,我会开启偷工减料。工作窃取是负载平衡的另一个名称。

于 2019-10-04T18:47:39.210 回答