3

我仍然开始使用 ReactiveCocoa 和函数式反应式编程概念,所以也许这是一个愚蠢的问题。

ReactiveCocoa 似乎很自然地设计为对实时数据流、触摸事件或加速度计传感器输入等做出反应。

是否可以在 ReactiveCocoa 中以一种简单的反应方式应用有限脉冲响应滤波器?或者如果不是,那么做这件事的最不丑陋的黑客方式是什么?一个人将如何去实现一个简单的移动平均线之类的东西?

理想情况下正在寻找 Swift 2 + RA4 解决方案,但也对这在 Objective C 和 RA2/RA3 中是否可行感兴趣。

4

2 回答 2

4

您实际需要的是某种周期缓冲区,它将保持一段时间的缓冲值,并且仅在缓冲区达到容量时才开始发送(下面的代码是对 takeLast 运算符的天启)

extension SignalType {
    func periodBuffer(period:Int) -> Signal<[Value], Error> {
        return Signal { observer in
            var buffer: [Value] = []
            buffer.reserveCapacity(period)

            return self.observe { event in
                switch event {
                case let .Next(value):
                    // To avoid exceeding the reserved capacity of the buffer, we remove then add.
                    // Remove elements until we have room to add one more.
                    while (buffer.count + 1) > period {
                        buffer.removeAtIndex(0)
                    }

                    buffer.append(value)

                    if buffer.count == period {
                        observer.sendNext(buffer)
                    }
                case let .Failed(error):
                    observer.sendFailed(error)
                case .Completed:
                    observer.sendCompleted()
                case .Interrupted:
                    observer.sendInterrupted()
                }
            }
        }
    }
}

基于此,您可以将其映射到您想要的任何算法

let pipe = Signal<Int,NoError>.pipe()

pipe.0
    .periodBuffer(3)
    .map { Double($0.reduce(0, combine: +))/Double($0.count) } // simple moving average
    .observeNext { print($0) }

pipe.1.sendNext(10) // does nothing
pipe.1.sendNext(11) // does nothing
pipe.1.sendNext(15) // prints 12
pipe.1.sendNext(7) // prints 11
pipe.1.sendNext(9) // prints 10.3333
pipe.1.sendNext(6) // prints 7.3333
于 2016-01-13T13:56:06.043 回答
1

可能scan信号运算符是您正在寻找的。受安迪雅各布斯回答的启发,我想出了这样的东西(一个简单的移动平均线实现):

  let (signal, observer) = Signal<Int,NoError>.pipe()

  let maxSamples = 3

  let movingAverage = signal.scan( [Int]() ) { (previousSamples, nextValue)  in 
    let samples : [Int] =  previousSamples.count < maxSamples ? previousSamples : Array(previousSamples.dropFirst())
    return samples + [nextValue]
  }
  .filter { $0.count >= maxSamples }
  .map { $0.average }

  movingAverage.observeNext { (next) -> () in
    print("Next: \(next)")
  }

  observer.sendNext(1)
  observer.sendNext(2)
  observer.sendNext(3)
  observer.sendNext(4)
  observer.sendNext(42)

注意:我必须将average方法移动到协议扩展中,否则编译器会抱怨表达式太复杂。我从这个答案中使用了一个很好的解决方案:

extension Array where Element: IntegerType {
    var total: Element {
        guard !isEmpty else { return 0 }
        return reduce(0){$0 + $1}
    }
    var average: Double {
        guard let total = total as? Int where !isEmpty else { return 0 }
        return Double(total)/Double(count)
    }
}
于 2016-01-16T13:30:13.107 回答