0

假设一个给定的迭代器产生一个浮点值和一个时间戳或等价物,我有一个处理器类接收该事件并使用新值更新自身,将这些存储在类似双端队列的数据结构中。

从那个双端队列中,我们可以假设很容易获得第一个/最后一个/最小值和最大值(尽管一种优雅而快速的方法可能具有挑战性,但这不是这里的问题)。我们称其为 OHLC 条。

记住以下约束,如何存储和动态更新最后的 n * OHLC 条:

  • OHLC 的持续时间固定为一个参数,并且对于所有 OHLC 都是相同的
  • 大小 (n) 作为 OHLC 的数量是一个固定的整数参数
  • 事件到达时间是随机的且间隔不均匀,而且可能会丢失一些数据点(即我们不会收到任何更新的时间间隔)
  • 目标是“在线”计算它:我认为它可以在熊猫/重采样中轻松完成,但其中的乐趣在哪里。
  • 我目前正在用 Python 编码

到目前为止,我有一个基于以下函数的固定持续时间窗口构建器类

return floor((dtime - dt.datetime.utcfromtimestamp(0)).total_seconds())

还有一个双端队列数据结构类型,它允许我使用这种算法来存储将服务于生成 1 个 OHLC 条的值。

self._values = collections.deque()

def update(self, x):
    self._start_idx = to_sec(x.dt) - self._duration_in_seconds
    self._end_idx = to_sec(x.dt)
   
    if self._first_run:
        self._first_run = False
        self._values.append(x)
        return
    
    value = self._values[0]
    while to_sec(value.dt) <= self._start_idx:
        self._values.popleft()
        value = self._values[0]
        
    self._values.append(x)

但是,我被困在如何在每次更新时存储这些让我们说 10 个 OHLC 条,并在每个将触发更新功能的新事件中使整个事物“滑动”。

4

0 回答 0