6

给定一个表格,其中第一列是某个参考点后的秒数,第二列是任意测量值:

6   0.738158581
21  0.801697222
39  1.797224596
49  2.77920469
54  2.839757536
79  3.832232283
91  4.676794376
97  5.18244704
100 5.521878863
118 6.316630137
131 6.778507504
147 7.020395216
157 7.331607129
176 7.637492223
202 7.848079136
223 7.989456499
251 8.76853608
278 9.092367123 
    ...

如您所见,测量是在不规则的时间点进行采样的。我需要通过平均每次测量前 100 秒的读数(在 Python 中)来平滑数据。由于数据表很大,因此确实首选基于迭代器的方法。不幸的是,经过两个小时的编码,我无法找到有效而优雅的解决方案。

谁能帮我?

编辑_

  1. 我希望每个原始读数都有一个平滑读数,并且平滑读数是原始读数和前 100(增量)秒内任何其他读数的算术平均值。(约翰,你是对的)

  2. 巨大的 ~ 1e6 - 10e6 行 + 需要使用紧凑的 RAM

  3. 数据近似随机游走

  4. 数据已排序

解析度

我已经测试了 J Machin 和 yairchu 提出的解决方案。他们都给出了相同的结果,但是,在我的数据集上,J Machin 的版本呈指数增长,而 yairchu 的版本是线性的。以下是由 IPython 的%timeit测量的执行时间(以微秒为单位):

data size   J Machin    yairchu
10        90.2        55.6
50          930         258
100         3080        514
500         64700       2660
1000        253000      5390
2000        952000      11500

谢谢大家的帮助。

4

8 回答 8

3

我正在使用一个求和结果,我将添加新成员并减去旧成员。然而,以这种方式,人们可能会遭受累积的浮点不准确性。

因此我用一个列表实现了一个“双端队列”。每当我的双端队列重新分配到更小的大小时。我在同一场合重新计算总和。

我还在计算直到点 x 的平均值,包括点 x,所以至少有一个样本点需要平均。

def getAvgValues(data, avgSampleTime):
  lastTime = 0
  prevValsBuf = []
  prevValsStart = 0
  tot = 0
  for t, v in data:
    avgStart = t - avgSampleTime
    # remove too old values
    while prevValsStart < len(prevValsBuf):
      pt, pv = prevValsBuf[prevValsStart]
      if pt > avgStart:
        break
      tot -= pv
      prevValsStart += 1
    # add new item
    tot += v
    prevValsBuf.append((t, v))
    # yield result
    numItems = len(prevValsBuf) - prevValsStart
    yield (t, tot / numItems)
    # clean prevVals if it's time
    if prevValsStart * 2 > len(prevValsBuf):
      prevValsBuf = prevValsBuf[prevValsStart:]
      prevValsStart = 0
      # recalculate tot for not accumulating float precision error
      tot = sum(v for (t, v) in prevValsBuf)
于 2009-06-21T13:17:51.487 回答
2

你还没有确切地说出你想要输出的时间。我假设您希望每个原始读数都有一个平滑读数,并且平滑读数是原始读数和前 100(增量)秒内的任何其他读数的算术平均值。

简短的回答:使用 collections.deque ......它的读数永远不会超过“delta”秒。按照我的设置方式,您可以将 deque 视为列表,并轻松计算平均值或一些花哨的 gizmoid,这些 gizmoid 对最近的读数具有更大的权重。

长答案:

>>> the_data = [tuple(map(float, x.split())) for x in """\
... 6       0.738158581
... 21      0.801697222
[snip]
... 251     8.76853608
... 278     9.092367123""".splitlines()]
>>> import collections
>>> delta = 100.0
>>> q = collections.deque()
>>> for t, v in the_data:
...     while q and q[0][0] <= t - delta:
...         # jettison outdated readings
...         _unused = q.popleft()
...     q.append((t, v))
...     count = len(q)
...     print t, sum(item[1] for item in q) / count, count
...
...
6.0 0.738158581 1
21.0 0.7699279015 2
39.0 1.112360133 3
49.0 1.52907127225 4
54.0 1.791208525 5
79.0 2.13137915133 6
91.0 2.49500989771 7
97.0 2.8309395405 8
100.0 3.12993279856 9
118.0 3.74976297144 9
131.0 4.41385300278 9
147.0 4.99420529389 9
157.0 5.8325615685 8
176.0 6.033109419 9
202.0 7.15545189083 6
223.0 7.4342562845 6
251.0 7.9150342134 5
278.0 8.4246097095 4
>>>

编辑

一站式商店:在这里获取您喜欢的小工具。这是代码:

numerator = sum(item[1] * upsilon ** (t - item[0]) for item in q)
denominator = sum(upsilon ** (t - item[0]) for item in q)
gizmoid = numerator / denominator

其中 upsilon 应该略小于 1.0(<= 零是非法的,略高于零几乎没有平滑,一个得到算术平均值加上浪费的 CPU 时间,大于 1 则与您的目的相反)。

于 2009-06-21T12:48:04.383 回答
0

您的数据似乎大致呈线性:

绘制您的数据 http://rix0r.nl/~rix0r/share/shot-20090621.144851.gif

你在寻找什么样的平滑?一条线与该数据集的最小二乘拟合?某种低通滤波器?或者是其他东西?

请告诉我们应用程序,以便我们可以为您提供更好的建议。

编辑:例如,根据应用程序,在第一个点和最后一个点之间插入一条线可能足以满足您的目的。

于 2009-06-21T12:50:36.250 回答
0

这使它成为线性:

def process_data(datafile):
    previous_n = 0
    previous_t = 0
    for line in datafile:
        t, number = line.strip().split()
        t = int(t)
        number = float(number)
        delta_n = number - previous_n
        delta_t = t - previous_t
        n_per_t = delta_n / delta_t
        for t0 in xrange(delta_t):
            yield previous_t + t0, previous_n + (n_per_t * t0)
        previous_n = n
        previous_t = t

f = open('datafile.dat')

for sample in process_data(f):
    print sample
于 2009-06-21T14:02:32.080 回答
0

O(1) 内存,以防您可以多次迭代输入 - 您可以将一个迭代器用于“左”,一个用于“右”。

def getAvgValues(makeIter, avgSampleTime):
  leftIter = makeIter()
  leftT, leftV = leftIter.next()
  tot = 0
  count = 0
  for rightT, rightV in makeIter():
    tot += rightV
    count += 1
    while leftT <= rightT - avgSampleTime:
      tot -= leftV
      count -= 1
      leftT, leftV = leftIter.next()
    yield rightT, tot / count
于 2009-06-21T14:46:42.460 回答
0

虽然它给出了一个指数衰减的平均值,而不是一个总平均值,但我认为你可能想要我所说的具有不同 alpha 的指数移动平均值,它实际上是一个单极低通滤波器。这个问题现在有了一个解决方案,它的运行时间与数据点的数量成线性关系。看看它是否适合你。

于 2009-06-23T00:51:54.027 回答
-1

像这样的东西呢,继续存储值直到与上次的时间差> 100,平均并产生这样的值,例如

def getAvgValues(data):
    lastTime = 0
    prevValues = []
    avgSampleTime=100

    for t, v in data:
        if t - lastTime < avgSampleTime:
            prevValues.append(v)
        else:
            avgV = sum(prevValues)/len(prevValues)
            lastTime = t
            prevValues = [v]
            yield (t,avgV)

for v in getAvgValues(data):
    print v
于 2009-06-21T11:50:55.920 回答
-2

听起来你需要一个简单的舍入公式。要将任何数字四舍五入到任意间隔:

轮次(次数/间隔)*间隔

您可以用地板或天花板代替圆形来代替“导致”或“因为”影响。它可以用任何语言工作,包括 SQL。

于 2009-06-21T14:13:38.657 回答