0

使用 RX 时扩展可用运算符的正确方法是什么?

我想构建一些我认为有用的操作。

第一个操作只是一个系列的标准差。

第二个操作是第 n 个滞后值,即如果我们滞后 2 并且我们的系列是 ABCDEF 当 F 被推动时滞后将为 D 当 A 被推动时滞后将为空/空当 B 被推动时滞后将为空/空当 C 被推动时,滞后将是 A

将这些类型的运算符基于 rx.codeplex.com 的内置函数是否有意义,还是有更简单的方法?

4

3 回答 3

1

在惯用的 Rx 中,任意延迟可以由Zip.

let lag (count : int) o = 
    let someo = Observable.map Some o
    let delayed = Observable.Repeat(None, count).Concat(someo)        
    Observable.Zip(someo, delayed, (fun c d -> d))    

至于滚动缓冲区,最有效的方法是简单地使用固定大小的Queue/ ResizeArray

let rollingBuffer (size : int) o = 
    Observable.Create(fun (observer : IObserver<_>) -> 
    let buffer = new Queue<_>(size)
    o |> Observable.subscribe(fun v -> 
            buffer.Enqueue(v)
            if buffer.Count = size then
                observer.OnNext(buffer.ToArray())
                buffer.Dequeue() |> ignore
        )
    )

对于numbers |> rollingBuffer 3 |> log

seq [0L; 1L; 2L]
seq [1L; 2L; 3L]
seq [2L; 3L; 4L]
seq [3L; 4L; 5L]
...

对于配对相邻值,您可以使用Observable.pairwise

let delta (a, b) = b - a
let deltaStream = numbers |>  Observable.pairwise |> Observable.map(delta) 

Observable.Scan如果要应用滚动计算,则更简洁。

于 2012-12-06T17:30:33.570 回答
1

其中一些比其他更容易(像往常一样)。对于计数(而不是时间)的“滞后”,您只需使用相当于“滞后”大小的 Observable.Buffer 创建一个滑动窗口,然后获取结果列表的第一个元素。

到目前为止lag = 3,函数是:

obs.Buffer(3,1).Select(l => l.[0])

这很容易变成一个扩展函数。我不知道它是否有效,因为它重用了相同的列表,但在大多数情况下,这无关紧要。我知道你想要 F#,翻译很简单。

对于运行聚合,您通常可以使用Observable.Scan来获取“运行”值。这是根据迄今为止看到的所有值计算的(并且实现起来非常简单) - 即,您必须实现的每个后续元素都是先前的聚合和新元素。

如果出于某种原因您需要基于滑动窗口的运行聚合,那么我们将进入更困难的领域。在这里你首先需要一个可以给你一个滑动窗口的操作——这在上面的 Buffer 中已经涵盖了。但是,您需要知道哪些值已从此窗口中删除,哪些已添加

因此,我推荐一个新的 Observable 函数,它基于现有窗口 + 新值维护一个内部窗口,并返回新窗口 + 删除值 + 附加值。您可以使用 Observable.Scan 编写此代码(我建议使用内部队列以实现高效)。它应该采用一个函数来确定在给定新值的情况下要删除哪些值(这样可以将其参数化为按时间或按计数滑动)。

此时,Observable.Scan 可以再次用于获取旧聚合 + 窗口 + 删除值 + 附加值并提供新聚合。

希望这会有所帮助,我确实意识到这是很多话。如果您可以确认需求,我可以为该特定用例提供实际的扩展方法。

于 2012-12-06T06:32:10.000 回答
0

因为lag,你可以做类似的事情

module Observable =
  let lag n obs =
    let buf = System.Collections.Generic.Queue()
    obs |> Observable.map (fun x ->
      buf.Enqueue(x)
      if buf.Count > n then Some(buf.Dequeue())
      else None)

这个:

Observable.Range(1, 9) 
  |> Observable.lag 2 
  |> Observable.subscribe (printfn "%A") 
  |> ignore

印刷:

<null>
<null>
Some 1
Some 2
Some 3
Some 4
Some 5
Some 6
Some 7
于 2012-12-06T15:22:11.867 回答