更新
看起来 Jon Skeet 是对的(大惊喜!),问题在于我对Average
扩展提供连续平均值的假设(它没有)。
对于我所追求的行为,我编写了一个简单的ContinuousAverage
扩展方法,我在这里包括了它的实现,以便其他可能想要类似东西的人受益:
public static class ObservableExtensions {
private class ContinuousAverager {
private double _mean;
private long _count;
public ContinuousAverager() {
_mean = 0.0;
_count = 0L;
}
// undecided whether this method needs to be made thread-safe or not
// seems that ought to be the responsibility of the IObservable (?)
public double Add(double value) {
double delta = value - _mean;
_mean += (delta / (double)(++_count));
return _mean;
}
}
public static IObservable<double> ContinousAverage(this IObservable<double> source) {
var averager = new ContinuousAverager();
return source.Select(x => averager.Add(x));
}
}
我正在考虑继续为其他明显的候选人做类似上述的事情——所以, ContinuousCount
, ContinuousSum
, ContinuousMin
, ContinuousMax
... 也许ContinuousVariance
还有ContinuousStandardDeviation
?对此有什么想法吗?
原始问题
我到处使用Rx Extensions ,感觉我已经掌握了基本的想法。
现在有些奇怪:我的印象是,如果我写了这个:
var ticks = Observable.FromEvent<QuoteEventArgs>(MarketDataProvider, "MarketTick");
var bids = ticks
.Where(e => e.EventArgs.Quote.HasBid)
.Select(e => e.EventArgs.Quote.Bid);
var bidsSubscription = bids.Subscribe(
b => Console.WriteLine("Bid: {0}", b)
);
var avgOfBids = bids.Average();
var avgOfBidsSubscription = avgOfBids.Subscribe(
b => Console.WriteLine("Avg Bid: {0}", b)
);
我会得到两个IObservable<double>
对象(bids
和avgOfBids
);一个基本上是来自 my 的所有市场出价流MarketDataProvider
,另一个是这些出价的平均值流。
所以是这样的:
Bid Avg Bid
1 1
2 1.5
1 1.33
2 1.5
看来我的avgOfBids
对象没有做任何事情。我错过了什么?我想我可能误解了Average
实际应该做什么。(这似乎也适用于所有类似聚合的扩展方法IObservable<T>
——例如Max
,、Count
等)