如何正确使用延迟的任务来对多列进行分组商数计算?
一些样本数据
raw_data = {
'subject_id': ['1', '2', '3', '4', '5'],
'name': ['A', 'B', 'C', 'D', 'E'],
'nationality': ['DE', 'AUT', 'US', 'US', 'US'],
'alotdifferent': ['x', 'y', 'z', 'x', 'a'],
'target': [0,0,0,1,1],
'age_group' : [1, 2, 1, 3, 1]}
df_a = pd.DataFrame(raw_data, columns = ['subject_id', 'name', 'nationality', 'alotdifferent','target','age_group'])
df_a.nationality = df_a.nationality.astype('category')
df_a.alotdifferent = df_a.alotdifferent.astype('category')
df_a.name = df_a.name.astype('category')
一些确定字符串/分类列的设置代码
FACTOR_FIELDS = df_a.select_dtypes(include=['category']).columns
columnsToDrop = ['alotdifferent']
columnsToBias_keep = FACTOR_FIELDS[~FACTOR_FIELDS.isin(columnsToDrop)]
target = 'target'
主要部分:分组商的计算
def compute_weights(da, colname):
# group only a single time
grouped = da.groupby([colname, target]).size()
# calculate first ratio
df = grouped / da[target].sum()
nameCol = "pre_" + colname
grouped_res = df.reset_index(name=nameCol)
grouped_res = grouped_res[grouped_res[target] == 1]
grouped_res = grouped_res.drop(target, 1)
# todo persist the result in dict for transformer
result_1 = grouped_res
return result_1, nameCol
现在实际上在多个列上调用它
original = df_a.copy()
output_df = original
ratio_weights = {}
for colname in columnsToBias_keep.union(columnsToDrop):
result_1, result_2, nameCol, nameCol_2 = compute_weights(original, colname)
# persist the result in dict for transformer
# this is required to separate fit and transform stage (later on in a sklearn transformer)
ratio_weights[nameCol] = result_1
ratio_weights[nameCol_2] = result_2
当尝试使用延迟的 dask 时,我需要调用计算来破坏 DAG。为了创建一个并行计算的大型计算图,我该如何解决这个问题?
compute_weights = delayed(compute_weights)
a,b = delayed_res_name.compute()
ratio_weights = {}
ratio_weights[b] = a