24

我正在寻找一种方法来执行类似的各种rolling_*功能pandas,但我希望滚动计算的窗口由一系列值(例如,DataFrame 的一列的值范围)定义,而不是由窗口中的行数。

例如,假设我有以下数据:

>>> print d
   RollBasis  ToRoll
0          1       1
1          1       4
2          1      -5
3          2       2
4          3      -4
5          5      -2
6          8       0
7         10     -13
8         12      -2
9         13      -5

如果我做类似的事情rolling_sum(d, 5),我会得到一个滚动总和,其中每个窗口包含 5 行。但我想要的是一个滚动总和,其中每个窗口都包含一定范围的RollBasis. 也就是说,我希望能够执行类似的操作d.roll_by(sum, 'RollBasis', 5),并得到一个结果,其中第一个窗口包含所有RollBasis介于 1 和 5 之间的行,然后第二个窗口包含所有RollBasis介于 2 和 6 之间的行,然后是第三个窗口窗口包含所有RollBasis在 3 到 7 之间的行,以此类推。窗口的行数不相等,但RollBasis每个窗口中选择的值范围是相同的。所以输出应该是这样的:

>>> d.roll_by(sum, 'RollBasis', 5)
    1    -4    # sum of elements with 1 <= Rollbasis <= 5
    2    -4    # sum of elements with 2 <= Rollbasis <= 6
    3    -6    # sum of elements with 3 <= Rollbasis <= 7
    4    -2    # sum of elements with 4 <= Rollbasis <= 8
    # etc.

我不能这样做groupby,因为groupby总是产生不相交的组。我不能用滚动函数来做到这一点,因为它们的窗口总是按行数滚动,而不是按值滚动。那么我该怎么做呢?

4

4 回答 4

19

我认为这可以满足您的要求:

In [1]: df
Out[1]:
   RollBasis  ToRoll
0          1       1
1          1       4
2          1      -5
3          2       2
4          3      -4
5          5      -2
6          8       0
7         10     -13
8         12      -2
9         13      -5

In [2]: def f(x):
   ...:     ser = df.ToRoll[(df.RollBasis >= x) & (df.RollBasis < x+5)]
   ...:     return ser.sum()

上面的函数接受一个值,在本例中为 RollBasis,然后根据该值索引数据框列 ToRoll。返回的系列由满足 RollBasis + 5 标准的 ToRoll 值组成。最后,对该系列求和并返回。

In [3]: df['Rolled'] = df.RollBasis.apply(f)

In [4]: df
Out[4]:
   RollBasis  ToRoll  Rolled
0          1       1      -4
1          1       4      -4
2          1      -5      -4
3          2       2      -4
4          3      -4      -6
5          5      -2      -2
6          8       0     -15
7         10     -13     -20
8         12      -2      -7
9         13      -5      -5

玩具示例 DataFrame 的代码,以防其他人想尝试:

In [1]: from pandas import *

In [2]: import io

In [3]: text = """\
   ...:    RollBasis  ToRoll
   ...: 0          1       1
   ...: 1          1       4
   ...: 2          1      -5
   ...: 3          2       2
   ...: 4          3      -4
   ...: 5          5      -2
   ...: 6          8       0
   ...: 7         10     -13
   ...: 8         12      -2
   ...: 9         13      -5
   ...: """

In [4]: df = read_csv(io.BytesIO(text), header=0, index_col=0, sep='\s+')
于 2013-01-13T19:49:12.047 回答
16

根据 Zelazny7 的回答,我创建了这个更通用的解决方案:

def rollBy(what, basis, window, func):
    def applyToWindow(val):
        chunk = what[(val<=basis) & (basis<val+window)]
        return func(chunk)
    return basis.apply(applyToWindow)

>>> rollBy(d.ToRoll, d.RollBasis, 5, sum)
0    -4
1    -4
2    -4
3    -4
4    -6
5    -2
6   -15
7   -20
8    -7
9    -5
Name: RollBasis

它仍然不理想,因为它与 相比非常慢rolling_apply,但也许这是不可避免的。

于 2013-01-13T20:46:14.157 回答
13

基于 BrenBarns 的回答,但通过使用基于标签的索引而不是基于布尔的索引来加速:

def rollBy(what,basis,window,func,*args,**kwargs):
    #note that basis must be sorted in order for this to work properly     
    indexed_what = pd.Series(what.values,index=basis.values)
    def applyToWindow(val):
        # using slice_indexer rather that what.loc [val:val+window] allows
        # window limits that are not specifically in the index
        indexer = indexed_what.index.slice_indexer(val,val+window,1)
        chunk = indexed_what[indexer]
        return func(chunk,*args,**kwargs)
    rolled = basis.apply(applyToWindow)
    return rolled

这比不使用索引列要快得多

In [46]: df = pd.DataFrame({"RollBasis":np.random.uniform(0,1000000,100000), "ToRoll": np.random.uniform(0,10,100000)})

In [47]: df = df.sort("RollBasis")

In [48]: timeit("rollBy_Ian(df.ToRoll,df.RollBasis,10,sum)",setup="from __main__ import rollBy_Ian,df", number =3)
Out[48]: 67.6615059375763

In [49]: timeit("rollBy_Bren(df.ToRoll,df.RollBasis,10,sum)",setup="from __main__ import rollBy_Bren,df", number =3)
Out[49]: 515.0221037864685

值得注意的是,基于索引的解决方案是 O(n),而逻辑切片版本在平均情况下是 O(n^2)(我认为)。

我发现在从 Basis 的最小值到 Basis 的最大值的均匀间隔的窗口上执行此操作比在每个基础值上执行此操作更有用。这意味着改变功能:

def rollBy(what,basis,window,func,*args,**kwargs):
    #note that basis must be sorted in order for this to work properly
    windows_min = basis.min()
    windows_max = basis.max()
    window_starts = np.arange(windows_min, windows_max, window)
    window_starts = pd.Series(window_starts, index = window_starts)
    indexed_what = pd.Series(what.values,index=basis.values)
    def applyToWindow(val):
        # using slice_indexer rather that what.loc [val:val+window] allows
        # window limits that are not specifically in the index
        indexer = indexed_what.index.slice_indexer(val,val+window,1)
        chunk = indexed_what[indexer]
        return func(chunk,*args,**kwargs)
    rolled = window_starts.apply(applyToWindow)
    return rolled
于 2014-01-17T15:22:13.390 回答
0

为了扩展@Ian Sudbury的答案,我将其扩展为可以通过将方法绑定到 DataFrame 类直接在数据帧上使用它(我希望我的代码在速度上肯定会有一些改进,因为我不知道如何访问类的所有内部)。

我还添加了面向后向窗口和居中窗口的功能。只有当您远离边缘时,它们才能完美运行。

import pandas as pd
import numpy as np

def roll_by(self, basis, window, func, forward=True, *args, **kwargs):
    the_indexed = pd.Index(self[basis])
    def apply_to_window(val):
        if forward == True:
            indexer = the_indexed.slice_indexer(val, val+window)
        elif forward == False:
            indexer = the_indexed.slice_indexer(val-window, val)
        elif forward == 'both':
            indexer = the_indexed.slice_indexer(val-window/2, val+window/2)
        else:
            raise RuntimeError('Invalid option for "forward". Can only be True, False, or "both".')
        chunck = self.iloc[indexer]
        return func(chunck, *args, **kwargs)
    rolled = self[basis].apply(apply_to_window)
    return rolled

pd.DataFrame.roll_by = roll_by

对于其他测试,我使用了以下定义:

def rollBy_Ian_iloc(what,basis,window,func,*args,**kwargs):
    #note that basis must be sorted in order for this to work properly   
    indexed_what = pd.Series(what.values,index=basis.values)
    def applyToWindow(val):
        # using slice_indexer rather that what.loc [val:val+window] allows
        # window limits that are not specifically in the index
        indexer = indexed_what.index.slice_indexer(val,val+window,1)
        chunk = indexed_what.iloc[indexer]
        return func(chunk,*args,**kwargs)
    rolled = basis.apply(applyToWindow)
    return rolled

def rollBy_Ian_index(what,basis,window,func,*args,**kwargs):
    #note that basis must be sorted in order for this to work properly   
    indexed_what = pd.Series(what.values,index=basis.values)
    def applyToWindow(val):
        # using slice_indexer rather that what.loc [val:val+window] allows
        # window limits that are not specifically in the index
        indexer = indexed_what.index.slice_indexer(val,val+window,1)
        chunk = indexed_what[indexed_what.index[indexer]]
        return func(chunk,*args,**kwargs)
    rolled = basis.apply(applyToWindow)
    return rolled

def rollBy_Bren(what, basis, window, func):
    def applyToWindow(val):
        chunk = what[(val<=basis) & (basis<val+window)]
        return func(chunk)
    return basis.apply(applyToWindow)

时间和测试:

df = pd.DataFrame({"RollBasis":np.random.uniform(0,100000,10000), "ToRoll": np.random.uniform(0,10,10000)}).sort_values("RollBasis")
In [14]: %timeit rollBy_Ian_iloc(df.ToRoll,df.RollBasis,10,sum)
    ...: %timeit rollBy_Ian_index(df.ToRoll,df.RollBasis,10,sum)
    ...: %timeit rollBy_Bren(df.ToRoll,df.RollBasis,10,sum)
    ...: %timeit df.roll_by('RollBasis', 10, lambda x: x['ToRoll'].sum())
    ...: 
484 ms ± 28.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
1.58 s ± 10.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
3.12 s ± 22.7 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
1.48 s ± 45.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

结论:绑定的方法不如@Ian Sudbury的方法快,但也没有@BrenBarn的慢,但它确实为可以调用它们的函数提供了更大的灵活性。

于 2021-11-09T13:51:33.323 回答