0

我正在尝试使用 map_partitions 在 dask 数据帧上应用一堆函数。它在本地定义函数时起作用,例如:

#assume a data frame df1

def upper(x):
    return x.str.upper()

def process(df,info): 
    for mapper,col in info['process']:
        df[col] = df[col].map_partitions(mapper, meta=df[col])
    return df

info = {'process':[(upper, 'column_name')]}
df = process(df1, info)

df.head()

但是当我们将代码拆分为模块时它不起作用..

助手.py

def upper(x):
    return x.str.upper()

def upper_lambda():
    return lambda x: x.str.upper()

主文件

import helper

#assume a data frame df1

def process(df,info): 
    for mapper,col in info['process']:
        df[col] = df[col].map_partitions(mapper, meta=df[col])
    return df

info = {'process':[(getattr(helper,'upper'), 'column_name')]}
#Tried with the lambda too.. dosent seem to work 
#info = {'process':[(helper.upper(), 'column_name')]}

df = process(df1, info)

df.head()

它只是抛出 KilledWorker:("('assign-read-parquet-head-1-5-assign-77bd7b855e5e8eec82312c65361fc7c5', 0)",

4

1 回答 1

2

Dask 当然支持使用来自另一个模块的函数。但是,这些模块应该存在于您正在使用的所有机器上。

对于像您的文件这样的小文件,helper.py您可能需要查看Client.upload_file以帮助您移动它。

于 2020-05-23T17:21:10.440 回答