如何在 RX 中对序列进行简单的、有状态的转换?
假设我们想要对 IObservable 噪声序列进行指数移动平均变换。
每当噪声序列滴答时,emaSequence 应该滴答并返回值 (previousEmaSequenceValue*(1-lambda) + latestNoisySequenceValue*lambda)
我想我们使用主题,但究竟如何?
public static void Main()
{
var rand = new Random();
IObservable<double> sequence = Observable
.Interval(TimeSpan.FromMilliseconds(1000))
.Select(value => value + rand.NextDouble());
Func<double, double> addNoise = x => x + 10*(rand.NextDouble() - 0.5);
IObservable<double> noisySequence = sequence.Select(addNoise);
Subject<double> exponentialMovingAverage = new Subject<double>(); // ???
sequence.Subscribe(value => Console.WriteLine("original sequence "+value));
noisySequence.Subscribe(value => Console.WriteLine("noisy sequence " + value));
exponentialMovingAverage.Subscribe(value => Console.WriteLine("ema sequence " + value));
Console.ReadLine();
}