3

更新

看起来 Jon Skeet 是对的(大惊喜!),问题在于我对Average扩展提供连续平均值的假设(它没有)。

对于我所追求的行为,我编写了一个简单的ContinuousAverage扩展方法,我在这里包括了它的实现,以便其他可能想要类似东西的人受益:

public static class ObservableExtensions {
    private class ContinuousAverager {
            private double _mean;
            private long _count;

        public ContinuousAverager() {
            _mean = 0.0;
            _count = 0L;
        }

        // undecided whether this method needs to be made thread-safe or not
        // seems that ought to be the responsibility of the IObservable (?)
        public double Add(double value) {
            double delta = value - _mean;
            _mean += (delta / (double)(++_count));
            return _mean;
        }
    }

    public static IObservable<double> ContinousAverage(this IObservable<double> source) {
        var averager = new ContinuousAverager();

        return source.Select(x => averager.Add(x));
    }
}

我正在考虑继续为其他明显的候选人做类似上述的事情——所以, ContinuousCount, ContinuousSum, ContinuousMin, ContinuousMax... 也许ContinuousVariance还有ContinuousStandardDeviation?对此有什么想法吗?


原始问题

我到处使用Rx Extensions ,感觉我已经掌握了基本的想法。

现在有些奇怪:我的印象是,如果我写了这个:

var ticks = Observable.FromEvent<QuoteEventArgs>(MarketDataProvider, "MarketTick");

var bids = ticks
    .Where(e => e.EventArgs.Quote.HasBid)
    .Select(e => e.EventArgs.Quote.Bid);

var bidsSubscription = bids.Subscribe(
    b => Console.WriteLine("Bid: {0}", b)
);

var avgOfBids = bids.Average();
var avgOfBidsSubscription = avgOfBids.Subscribe(
    b => Console.WriteLine("Avg Bid: {0}", b)
);

我会得到两个IObservable<double>对象(bidsavgOfBids);一个基本上是来自 my 的所有市场出价流MarketDataProvider,另一个是这些出价的平均值流。

所以是这样的:

Bid    Avg Bid
1      1
2      1.5
1      1.33
2      1.5

看来我的avgOfBids对象没有做任何事情。我错过了什么?我想我可能误解了Average实际应该做什么。(这似乎也适用于所有类似聚合的扩展方法IObservable<T>——例如Max,、Count等)

4

3 回答 3

3

另一种实现 ContinousAverage 的方法是使用 .Scan():

bids.Scan(new { sum = .0, count = 0 }, 
          (agg, x) => new { sum = agg.sum + x, count = agg.count + 1 })
    .Select(agg => agg.sum / agg.count)
于 2010-05-09T05:41:35.257 回答
2

聚合方法不提供滚动最大值/最小值/计数/平均值等 - 它们在原始流完成时提供单个值。例如,如果您将代码更改为:

var avgOfBids = bids.Take(5).Average();
var avgOfBidsSubscription = avgOfBids.Subscribe(
    b => Console.WriteLine("Avg Bid: {0}", b)
);

然后,一旦出价 5 次,它将显示它们的平均值。然后“平均”流也将完成。

于 2010-05-07T20:07:59.563 回答
0

订阅出价事件流后;您已有效阻止任何其他观察者订阅(订阅返回 IDisposable。)

您必须定义另一个与刻度并行的 Observable,并订阅该 Observable 以对其进行平均。

var ticksToAverage = Observable.FromEvent<QuoteEventArgs>(MarketDataProvider, "MarketTick");

 var bidsToAverage = ticksToAverage
      .Where(e => e.EventArgs.Quote.HasBid)
         .Select(e => e.EventArgs.Quote.Bid);


var avgOfBids = bidsToAverage.Average();
var avgOfBidsSubscription = avgOfBids.Subscribe( b => Console.WriteLine("Avg Bid: {0}", b)
                ); 
于 2010-05-07T20:06:21.063 回答