30

我有一个日期范围和每个日期的测量值。我想计算每个日期的指数移动平均值。有人知道怎么做这个吗?

我是 python 新手。标准python库中似乎没有内置平均值,这让我觉得有点奇怪。也许我没有找对地方。

那么,给定以下代码,我如何计算日历日期的 IQ 点的移动加权平均值?

from datetime import date
days = [date(2008,1,1), date(2008,1,2), date(2008,1,7)]
IQ = [110, 105, 90]

(可能有更好的方法来构建数据,任何建议将不胜感激)

4

16 回答 16

23

编辑:似乎mov_average_expw()来自SciKits的scikits.timeseries.lib.movi ​​ng_funcs子模块的功能(补充SciPy的附加工具包)更适合您问题的措辞。


要使用平滑因子计算数据的指数平滑alpha(1 - alpha)用维基百科的术语):

>>> alpha = 0.5
>>> assert 0 < alpha <= 1.0
>>> av = sum(alpha**n.days * iq 
...     for n, iq in map(lambda (day, iq), today=max(days): (today-day, iq), 
...         sorted(zip(days, IQ), key=lambda p: p[0], reverse=True)))
95.0

上面的不是很漂亮,所以让我们稍微重构一下:

from collections import namedtuple
from operator    import itemgetter

def smooth(iq_data, alpha=1, today=None):
    """Perform exponential smoothing with factor `alpha`.

    Time period is a day.
    Each time period the value of `iq` drops `alpha` times.
    The most recent data is the most valuable one.
    """
    assert 0 < alpha <= 1

    if alpha == 1: # no smoothing
        return sum(map(itemgetter(1), iq_data))

    if today is None:
        today = max(map(itemgetter(0), iq_data))

    return sum(alpha**((today - date).days) * iq for date, iq in iq_data)

IQData = namedtuple("IQData", "date iq")

if __name__ == "__main__":
    from datetime import date

    days = [date(2008,1,1), date(2008,1,2), date(2008,1,7)]
    IQ = [110, 105, 90]
    iqdata = list(map(IQData, days, IQ))
    print("\n".join(map(str, iqdata)))

    print(smooth(iqdata, alpha=0.5))

例子:

$ python26 smooth.py
IQData(date=datetime.date(2008, 1, 1), iq=110)
IQData(date=datetime.date(2008, 1, 2), iq=105)
IQData(date=datetime.date(2008, 1, 7), iq=90)
95.0
于 2009-01-28T19:15:34.593 回答
14

我做了一些谷歌搜索,发现了以下示例代码(http://osdir.com/ml/python.matplotlib.general/2005-04/msg00044.html):

def ema(s, n):
    """
    returns an n period exponential moving average for
    the time series s

    s is a list ordered from oldest (index 0) to most
    recent (index -1)
    n is an integer

    returns a numeric array of the exponential
    moving average
    """
    s = array(s)
    ema = []
    j = 1

    #get n sma first and calculate the next n period ema
    sma = sum(s[:n]) / n
    multiplier = 2 / float(1 + n)
    ema.append(sma)

    #EMA(current) = ( (Price(current) - EMA(prev) ) x Multiplier) + EMA(prev)
    ema.append(( (s[n] - sma) * multiplier) + sma)

    #now calculate the rest of the values
    for i in s[n+1:]:
        tmp = ( (i - ema[j]) * multiplier) + ema[j]
        j = j + 1
        ema.append(tmp)

    return ema
于 2009-01-28T18:12:07.030 回答
13

我总是用 Pandas 计算 EMA:

这是一个如何做到这一点的例子:

import pandas as pd
import numpy as np

def ema(values, period):
    values = np.array(values)
    return pd.ewma(values, span=period)[-1]

values = [9, 5, 10, 16, 5]
period = 5

print ema(values, period)

关于 Pandas EWMA 的更多信息:

http://pandas.pydata.org/pandas-docs/stable/generated/pandas.ewma.html

于 2015-10-04T12:42:55.487 回答
7

您还可以使用 SciPy 过滤器方法,因为 EMA 是 IIR 过滤器。与enumerate()方法相比,这将具有在我的系统上使用timeit在大型数据集上测量的速度大约 64 倍的好处。

import numpy as np
from scipy.signal import lfilter

x = np.random.normal(size=1234)
alpha = .1 # smoothing coefficient
zi = [x[0]] # seed the filter state with first value
# filter can process blocks of continuous data if <zi> is maintained
y, zi = lfilter([1.-alpha], [1., -alpha], x, zi=zi)
于 2017-03-10T17:31:16.263 回答
6

我不知道 Python,但对于平均部分,你的意思是指数衰减的低通滤波器的形式

y_new = y_old + (input - y_old)*alpha

其中 alpha = dt/tau,dt = 滤波器的时间步长,tau = 滤波器的时间常数?(这个的变时间步的形式如下,只要把dt/tau剪成不大于1.0)

y_new = y_old + (input - y_old)*dt/tau

如果要过滤日期之类的内容,请确保转换为浮点数,例如自 1970 年 1 月 1 日以来的秒数。

于 2009-01-28T18:10:09.187 回答
5

我的 python 有点生疏(如果我以某种方式弄乱了语法,任何人都可以随意编辑此代码以进行更正),但这里是......

def movingAverageExponential(values, alpha, epsilon = 0):

   if not 0 < alpha < 1:
      raise ValueError("out of range, alpha='%s'" % alpha)

   if not 0 <= epsilon < alpha:
      raise ValueError("out of range, epsilon='%s'" % epsilon)

   result = [None] * len(values)

   for i in range(len(result)):
       currentWeight = 1.0

       numerator     = 0
       denominator   = 0
       for value in values[i::-1]:
           numerator     += value * currentWeight
           denominator   += currentWeight

           currentWeight *= alpha
           if currentWeight < epsilon: 
              break

       result[i] = numerator / denominator

   return result

此函数向后移动,从列表的末尾到开头,通过向后工作计算每个值的指数移动平均值,直到元素的权重系数小于给定的 epsilon。

在函数结束时,它会在返回列表之前反转值(以便调用者的顺序正确)。

(旁注:如果我使用的是 python 以外的语言,我会先创建一个全尺寸的空数组,然后按倒序填充它,这样我就不必在最后反转它。但我不'认为你不能在 python 中声明一个大的空数组。在 python 列表中,追加比前置要便宜得多,这就是我以相反顺序构建列表的原因。如果我错了,请纠正我。)

'alpha' 参数是每次迭代的衰减因子。例如,如果您使用 0.5 的 alpha,那么今天的移动平均值将由以下加权值组成:

today:        1.0
yesterday:    0.5
2 days ago:   0.25
3 days ago:   0.125
...etc...

当然,如果你有大量的值,那么十天或十五天前的值对今天的加权平均值贡献不大。'epsilon' 参数让您设置一个截止点,低于该截止点​​您将不再关心旧值(因为它们对今天的价值的贡献将是微不足道的)。

你会像这样调用函数:

result = movingAverageExponential(values, 0.75, 0.0001)
于 2009-01-28T18:46:59.893 回答
5

在 matplotlib.org 示例 ( http://matplotlib.org/examples/pylab_examples/finance_work2.html ) 中提供了一个使用 numpy 的指数移动平均 (EMA) 函数的好例子:

def moving_average(x, n, type):
    x = np.asarray(x)
    if type=='simple':
        weights = np.ones(n)
    else:
        weights = np.exp(np.linspace(-1., 0., n))

    weights /= weights.sum()

    a =  np.convolve(x, weights, mode='full')[:len(x)]
    a[:n] = a[n]
    return a
于 2014-06-25T00:38:56.737 回答
3

我发现@earino 的上述代码片段非常有用 - 但我需要一些可以持续平滑值流的东西 - 所以我将它重构为:

def exponential_moving_average(period=1000):
    """ Exponential moving average. Smooths the values in v over ther period. Send in values - at first it'll return a simple average, but as soon as it's gahtered 'period' values, it'll start to use the Exponential Moving Averge to smooth the values.
    period: int - how many values to smooth over (default=100). """
    multiplier = 2 / float(1 + period)
    cum_temp = yield None  # We are being primed

    # Start by just returning the simple average until we have enough data.
    for i in xrange(1, period + 1):
        cum_temp += yield cum_temp / float(i)

    # Grab the timple avergae
    ema = cum_temp / period

    # and start calculating the exponentially smoothed average
    while True:
        ema = (((yield ema) - ema) * multiplier) + ema

我像这样使用它:

def temp_monitor(pin):
    """ Read from the temperature monitor - and smooth the value out. The sensor is noisy, so we use exponential smoothing. """
    ema = exponential_moving_average()
    next(ema)  # Prime the generator

    while True:
        yield ema.send(val_to_temp(pin.read()))

(其中 pin.read() 产生我想使用的下一个值)。

于 2014-02-12T20:35:05.880 回答
2

这是我基于http://stockcharts.com/school/doku.php?id=chart_school:technical_indicators:moving_averages处理的一个简单示例

请注意,与他们的电子表格不同,我不计算 SMA,也不会等待 10 个样本后生成 EMA。这意味着我的值略有不同,但如果你绘制它,它会在 10 个样本之后完全遵循。在前 10 个样本中,我计算的 EMA 被适当地平滑了。

def emaWeight(numSamples):
    return 2 / float(numSamples + 1)

def ema(close, prevEma, numSamples):
    return ((close-prevEma) * emaWeight(numSamples) ) + prevEma

samples = [
22.27, 22.19, 22.08, 22.17, 22.18, 22.13, 22.23, 22.43, 22.24, 22.29,
22.15, 22.39, 22.38, 22.61, 23.36, 24.05, 23.75, 23.83, 23.95, 23.63,
23.82, 23.87, 23.65, 23.19, 23.10, 23.33, 22.68, 23.10, 22.40, 22.17,
]
emaCap = 10
e=samples[0]
for s in range(len(samples)):
    numSamples = emaCap if s > emaCap else s
    e =  ema(samples[s], e, numSamples)
    print e
于 2015-08-12T03:00:42.980 回答
2

可能最短:

#Specify decay in terms of span
#data_series should be a DataFrame

ema=data_series.ewm(span=5, adjust=False).mean()

于 2019-10-03T05:12:07.567 回答
0

一种快速方法(从此处复制粘贴)如下:

def ExpMovingAverage(values, window):
    """ Numpy implementation of EMA
    """
    weights = np.exp(np.linspace(-1., 0., window))
    weights /= weights.sum()
    a =  np.convolve(values, weights, mode='full')[:len(values)]
    a[:window] = a[window]
    return a
于 2017-11-28T20:21:08.263 回答
0

我使用列表和衰减率作为输入。考虑到深度递归在 python 中不稳定,我希望这个只有两行代码的小函数可以帮助你。

def expma(aseries, ratio):
    return sum([ratio*aseries[-x-1]*((1-ratio)**x) for x in range(len(aseries))])
于 2018-07-19T06:48:27.587 回答
0

更简单地说,使用熊猫

def EMA(tw):
    for x in tw:
        data["EMA{}".format(x)] = data['close'].ewm(span=x, adjust=False).mean()
        EMA([10,50,100])
于 2019-09-28T10:40:10.810 回答
0

Papahaba 的答案几乎就是我想要的(谢谢!),但我需要匹配初始条件。使用 IIR 滤波器scipy.signal.lfilter当然是最有效的。这是我的还原:

给定一个 NumPy 向量,x

import numpy as np
from scipy import signal

period = 12
b = np.array((1,), 'd')
a = np.array((period, 1-period), 'd')
zi = signal.lfilter_zi(b, a)
y, zi = signal.lfilter(b, a, x, zi=zi*x[0:1])

获取向量中返回的 N 点 EMA(此处为 12)y

于 2020-10-02T02:29:04.180 回答
0

我在这里参加聚会有点晚了,但是给出的解决方案都不是我想要的。使用递归和投资百科中给出的确切公式是一个不错的小挑战。不需要 numpy 或 pandas。

prices = [{'i': 1, 'close': 24.5}, {'i': 2, 'close': 24.6}, {'i': 3, 'close': 24.8}, {'i': 4, 'close': 24.9},
          {'i': 5, 'close': 25.6}, {'i': 6, 'close': 25.0}, {'i': 7, 'close': 24.7}]


def rec_calculate_ema(n):
    k = 2 / (n + 1)
    price = prices[n]['close']
    if n == 1:
        return price
    res = (price * k) + (rec_calculate_ema(n - 1) * (1 - k))
    return res


print(rec_calculate_ema(3))
于 2021-07-29T00:26:33.350 回答
0
import pandas_ta as ta

data["EMA3"] = ta.ema(data["close"], length=3)
于 2022-02-24T11:03:55.713 回答