2

自定义 daskGroupBy Aggregation非常方便,但我很难定义一个为column 中最常用值工作的值。

我有什么:

因此,从这里的示例中,我们可以像这样定义自定义聚合函数:

custom_sum = dd.Aggregation('custom_sum', lambda s: s.sum(), lambda s0: s0.sum())
my_aggregate = {
    'A': custom_sum,
    'B': custom_most_often_value, ### <<< This is the goal.
    'C': ['max','min','mean'],
    'D': ['max','min','mean']
}
col_name = 'Z'
ddf_agg = ddf.groupby(col_name).agg(my_aggregate).compute()

虽然这适用于(如在示例页面上),但对最常见值custom_sum的适应可能是这样的(来自此处的示例):

custom_most_often_value = dd.Aggregation('custom_most_often_value', lambda x:x.value_counts().index[0], lambda x0:x0.value_counts().index[0])

但它产生

ValueError: Metadata inference failed in `_agg_finalize`.

You have supplied a custom function and Dask is unable to 
determine the type of output that that function returns. 

然后我试图在实现meta中找到关键字来定义它,但找不到它。而且在示例中不需要它的事实让我认为错误在其他地方。dd.Aggregationcustom_sum

所以我的问题是,如何在df.groupby(..).agg(..). 谢谢!

4

2 回答 2

2

快速澄清而不是答案:该meta参数用于.agg()方法中,以指定您期望的列数据类型,最好表示为零长度的熊猫数据框。否则,Dask 将为您的函数提供虚拟数据,以尝试猜测这些类型,但这并不总是有效。

于 2020-08-10T17:35:56.740 回答
1

您遇到的问题是聚合的单独阶段不能是递归应用的相同函数,就像您正在查看的 custom_sum 示例中那样。

我已经修改了这个答案的代码,留下了@ user8570642的评论,因为它们非常有帮助。请注意,此方法将解决 groupby 键列表: https ://stackoverflow.com/a/46082075/3968619

def chunk(s):
    # for the comments, assume only a single grouping column, the 
    # implementation can handle multiple group columns.
    #
    # s is a grouped series. value_counts creates a multi-series like 
    # (group, value): count
    return s.value_counts()


def agg(s):
#     print('agg',s.apply(lambda s: s.groupby(level=-1).sum()))
    # s is a grouped multi-index series. In .apply the full sub-df will passed
    # multi-index and all. Group on the value level and sum the counts. The
    # result of the lambda function is a series. Therefore, the result of the 
    # apply is a multi-index series like (group, value): count
    return s.apply(lambda s: s.groupby(level=-1).sum())

    # faster version using pandas internals
    s = s._selected_obj
    return s.groupby(level=list(range(s.index.nlevels))).sum()


def finalize(s):
    # s is a multi-index series of the form (group, value): count. First
    # manually group on the group part of the index. The lambda will receive a
    # sub-series with multi index. Next, drop the group part from the index.
    # Finally, determine the index with the maximum value, i.e., the mode.
    level = list(range(s.index.nlevels - 1))
    return (
        s.groupby(level=level)
        .apply(lambda s: s.reset_index(level=level, drop=True).idxmax())
    )

max_occurence = dd.Aggregation('mode', chunk, agg, finalize)

chunk将计算groupby每个分区中对象的值。 agg将从chunk原始 groupby 命令中获取结果并将其分组,并对值计数进行求和,以便我们获得每个组的值计数。 finalize将采用由 提供的多索引系列,并从 中返回每个组中agg出现频率最高的值。BZ

这是一个测试用例:

df = dd.from_pandas(
    pd.DataFrame({"A":[1,1,1,1,2,2,3]*10,"B":[5,5,5,5,1,1,1]*10,
                  'Z':['mike','amy','amy','amy','chris','chris','sandra']*10}), npartitions=10)
res = df.groupby(['Z']).agg({'B': mode}).compute()
print(res)
于 2020-08-10T18:31:01.257 回答