1

正如标题所说,我无法运行此代码:

def simple_map(x):
    y = seasonal_decompose(x,model='additive',extrapolate_trend='freq',period=7,two_sided=False)
    return y.trend

b.map_partitions(simple_map,meta=b).compute()

其中 b 是一个 dask DataFrame,以 datetime 作为索引,以一系列浮点数作为列,seasonal_decompose 是 statsmodel 之一。

这就是我得到的:

Index(...) must be called with a collection of some kind, 'seasonal' was passed

如果我做:

b.apply(simple_map,axis=0)

其中 b 是 pandas DataFrame 我得到了我想要的。

我哪里错了?

#

可重现的例子:

import pandas as pd
from statsmodels.tsa.seasonal import seasonal_decompose

d = {'Val1': [3, 2,7,5], 'Val2': [2, 4,8,6]}
b=pd.DataFrame(data=d)
b=b.set_index(pd.to_datetime(['25/12/1991','26/12/1991','27/12/1991','28/12/1991']))

def simple_map(x):
    y =seasonal_decompose(x,model='additive',extrapolate_trend='freq',period=2,two_sided=False)
    return y.trend

b.apply(simple_map,axis=0)

            Val1    Val2
1991-12-25  0.70    0.9
1991-12-26  2.10    2.7
1991-12-27  3.50    4.5
1991-12-28  5.25    6.5

这就是我想用 dask 做的,但我不能

实际上:

import dask.dataframe as dd

c=dd.from_pandas(b, npartitions=1)
c.map_partitions(simple_map,meta=c).compute()

产生上述指定的错误。

4

1 回答 1

1

谢谢你的例子!

从应用的文档字符串

传递给函数的对象是 Series 对象,其索引是 DataFrame 的索引 ( axis=0)

但是,map_partitions将在整个 Dataframe 上工作。我建议稍微重写函数:

 def simple_map_2(x):
     xVal1 = seasonal_decompose(x.Val1,model='additive',extrapolate_trend='freq',period=2,two_sided=False)
     xVal2 = seasonal_decompose(x.Val2,model='additive',extrapolate_trend='freq',period=2,two_sided=False)
     return pd.DataFrame({'Val1': xVal1.trend, 'Val2': xVal2.trend})

c.map_partitions(simple_map_2,meta=make_meta(c)).compute()

            Val1  Val2
1991-12-25  0.70   0.9
1991-12-26  2.10   2.7
1991-12-27  3.50   4.5
1991-12-28  5.25   6.5
于 2020-05-28T17:08:42.870 回答