为了记录,这就是我最后所做的。我仍然是一个 Rx 学习者,并且在 2.0 版最后一次看到它后返回 .Net。非常感谢所有反馈。
下面使用的 Ticks 对象可能包含一个或多个刻度值。历史数据服务以多个 Ticks 形式返回数据。
public class HistoricalAndLivePriceFeed : IPriceFeed
{
    private readonly IPriceFeed history;
    private readonly IPriceFeed live;
    private readonly IClock clock;
    public HistoricalAndLivePriceFeed(IPriceFeed history, IPriceFeed live)
:            this(history, live, new RealClock())
        {
    }
    public HistoricalAndLivePriceFeed(IPriceFeed history, IPriceFeed live, IClock clock)
    {
        this.history = history;
        this.live = live;
        this.clock = clock;
    }
    public IObservable<Ticks> For(DateTime since, ISymbol symbol)
    {
        return Observable.Create<Ticks>(observer =>
        {
            var liveStream = Buffer<Ticks>.StartBuffering(live.For(since, symbol));
            var definitelyInHistoricalTicks = clock.Now;
            // Sleep to make sure that historical data overlaps our live data
            // If we ever use a data provider with less fresh historical data, we may need to rethink this
            clock.Wait(TimeSpan.FromSeconds(1));
            var liveStreamAfterEndOfHistoricalTicks = liveStream
               .SkipWhile(ticks => ticks.LastTimestamp <= definitelyInHistoricalTicks)
               .Select(ticks => ticks.RemoveBefore(definitelyInHistoricalTicks + 1));
            var subscription = history.For(since, symbol)
               .Select(historicalTicks => historicalTicks.RemoveAtOrAfter(definitelyInHistoricalTicks + 1))
               .Concat(liveStreamAfterEndOfHistoricalTicks)
               .Subscribe(observer);
            return liveStream.And(subscription);
        });
    }
}
public static class CompositeDisposableExtensions
{
    public static CompositeDisposable And(this IDisposable disposable, Action action)
    {
        return And(disposable, Disposable.Create(action));
    }
    public static CompositeDisposable And(this IDisposable disposable, IDisposable other)
    {
        return new CompositeDisposable(disposable, other);
    }
}
它使用了这个 Rx 代码,我仍然不太信任:
using System;
using System.Collections.Generic;
using System.Reactive.Disposables;
using System.Reactive.Subjects;
namespace My.Rx
{
    /// <summary>
    /// Buffers values from an underlying observable when no observers are subscribed.
    /// 
    /// On Subscription, any buffered values will be replayed.
    /// 
    /// Only supports one observer for now.
    /// 
    /// Buffer is an ISubject for convenience of implementation but IObserver methods
    /// are hidden. It is not intended that Buffer should be used as an IObserver,
    /// except through StartBuffering() and it is dangerous to do so because none of 
    /// the IObserver methods check whether Buffer has been disposed.
    /// </summary>
    /// <typeparam name="TSource"></typeparam>
    public class Buffer<TSource> : ISubject<TSource>, IDisposable
    {
        private readonly object gate = new object();
        private readonly Queue<TSource> queue = new Queue<TSource>();
        private bool isDisposed;
        private Exception error;
        private bool stopped;
        private IObserver<TSource> observer = null;
        private IDisposable subscription;
        public static Buffer<TSource> StartBuffering(IObservable<TSource> observable)
        {
            return new Buffer<TSource>(observable);
        }
        private Buffer(IObservable<TSource> observable)
        {
            subscription = observable.Subscribe(this);
        }
        void IObserver<TSource>.OnNext(TSource value)
        {
            lock (gate)
            {
                if (stopped) return;
                if (IsBuffering)
                    queue.Enqueue(value);
                else
                    observer.OnNext(value);
            }
        }
        void IObserver<TSource>.OnError(Exception error)
        {
            lock (gate)
            {
                if (stopped) return;
                if (IsBuffering)
                    this.error = error;
                else
                    observer.OnError(error);
                stopped = true;
            }
        }
        void IObserver<TSource>.OnCompleted()
        {
            lock (gate)
            {
                stopped = true;
            }
        }
        public IDisposable Subscribe(IObserver<TSource> observer)
        {
            lock (gate)
            {
                if (isDisposed)
                    throw new ObjectDisposedException(string.Empty);
                if (this.observer != null)
                    throw new NotImplementedException("A Buffer can currently only support one observer at a time");
                while(!queue.IsEmpty())
                {
                    observer.OnNext(queue.Dequeue());
                }
                if (error != null)
                    observer.OnError(error);
                else if (stopped)
                    observer.OnCompleted();
                this.observer = observer;
                return Disposable.Create(() =>
                                             {
                                                 lock (gate)
                                                 {
                                                                             // Go back to buffering
                                                     this.observer = null;
                                                 }
                                             });
            }
        }
        private bool IsBuffering
        {
            get { return observer == null; }
        }
        public void Dispose()
        {
            lock (gate)
            {
                subscription.Dispose();
                isDisposed = true;
                subscription = null;
                observer = null;
            }
        }
    }
}
它通过了这些测试(我还没有费心检查线程安全):
private static readonly Exception exceptionThrownFromUnderlying = new Exception("Hello world");
[Test]
public void ReplaysBufferedValuesToFirstSubscriber()
{
    var underlying = new Subject<int>();
    var buffer = Buffer<int>.StartBuffering(underlying);
    underlying.OnNext(1);
    underlying.OnNext(2);
    var observed = new List<int>();
    buffer.Subscribe(Observer.Create<int>(observed.Add));
    Assert.That(observed, Is.EquivalentTo(new []{1,2}));
}
[Test]
public void PassesNewValuesToObserver()
{
    var underlying = new Subject<int>();
    var buffer = Buffer<int>.StartBuffering(underlying);
    var observed = new List<int>();
    buffer.Subscribe(Observer.Create<int>(observed.Add));
    underlying.OnNext(1);
    underlying.OnNext(2);
    Assert.That(observed, Is.EquivalentTo(new[] { 1, 2 }));
}
[Test]
public void DisposesOfSubscriptions()
{
    var underlying = new Subject<int>();
    var buffer = Buffer<int>.StartBuffering(underlying);
    var observed = new List<int>();
    buffer.Subscribe(Observer.Create<int>(observed.Add))
        .Dispose();
    underlying.OnNext(1);
    Assert.That(observed, Is.Empty);
}
[Test]
public void StartsBufferingAgainWhenSubscriptionIsDisposed()
{
    var underlying = new Subject<int>();
    var buffer = Buffer<int>.StartBuffering(underlying);
    // These should be buffered
    underlying.OnNext(1);
    underlying.OnNext(2);
    var firstSubscriptionObserved = new List<int>();
    using (buffer.Subscribe(Observer.Create<int>(firstSubscriptionObserved.Add)))
    {
        // Should be passed through to first subscription
        underlying.OnNext(3);
    }
    Assert.That(firstSubscriptionObserved, Is.EquivalentTo(new[] { 1, 2, 3 }));
    // First subscription has been disposed-
    // we should be back to buffering again
    underlying.OnNext(4);
    underlying.OnNext(5);
    var secondSubscriptionObserved = new List<int>();
    using (buffer.Subscribe(Observer.Create<int>(secondSubscriptionObserved.Add)))
    {
        // Should be passed through to second subscription
        underlying.OnNext(6);
    }
    Assert.That(secondSubscriptionObserved, Is.EquivalentTo(new[] { 4, 5 ,6}));
}
[Test]
public void DoesNotSupportTwoConcurrentObservers()
{
    // Use .Publish() if you need to do this
    var underlying = new Subject<int>();
    var buffer = Buffer<int>.StartBuffering(underlying);
    buffer.Subscribe(Observer.Create<int>(i => { }));
    Assert.Throws<NotImplementedException>(() => buffer.Subscribe(Observer.Create<int>(i => { })));
}
[Test]
public void CannotBeUsedAfterDisposal()
{
    var underlying = new Subject<int>();
    var buffer = Buffer<int>.StartBuffering(underlying);
    buffer.Dispose();
    Assert.Throws<ObjectDisposedException>(() => buffer.Subscribe(Observer.Create<int>(i => { })));
}
[Test]
public void ReplaysBufferedError()
{
    var underlying = new Subject<int>();
    var buffer = Buffer<int>.StartBuffering(underlying);
    underlying.OnNext(1);
    underlying.OnError(exceptionThrownFromUnderlying);
    var observed = new List<int>();
    Exception foundException = null;
    buffer.Subscribe(
        observed.Add, 
        e => foundException = e);
    Assert.That(observed, Is.EquivalentTo(new []{1}));
    Assert.That(foundException, Is.EqualTo(exceptionThrownFromUnderlying));
}
[Test]
public void ReplaysBufferedCompletion()
{
    var underlying = new Subject<int>();
    var buffer = Buffer<int>.StartBuffering(underlying);
    underlying.OnNext(1);
    underlying.OnCompleted();
    var observed = new List<int>();
    var completed = false;
    buffer.Subscribe(
        observed.Add,
        () => completed=true);
    Assert.That(observed, Is.EquivalentTo(new[] { 1 }));
    Assert.True(completed);
}
[Test]
public void ReplaysBufferedErrorToSubsequentObservers()
{
    var underlying = new Subject<int>();
    var buffer = Buffer<int>.StartBuffering(underlying);
    underlying.OnNext(1);
    underlying.OnError(exceptionThrownFromUnderlying);
    // Drain value queue
    using (buffer.Subscribe(Observer.Create<int>(i => { }, e => { }))) ;
    var observered = new List<int>();
    Exception exceptionEncountered = null;
    using (buffer.Subscribe(Observer.Create<int>(observered.Add, e=>exceptionEncountered=e)));
    Assert.That(observered, Is.Empty);
    Assert.That(exceptionEncountered, Is.EqualTo(exceptionThrownFromUnderlying));
}
[Test]
public void ReplaysBufferedCompletionToSubsequentObservers()
{
    var underlying = new Subject<int>();
    var buffer = Buffer<int>.StartBuffering(underlying);
    underlying.OnNext(1);
    underlying.OnCompleted();
    // Drain value queue
    using (buffer.Subscribe(Observer.Create<int>(i => { }))) ;
    var observered = new List<int>();
    var completed = false;
    using (buffer.Subscribe(Observer.Create<int>(observered.Add, ()=>completed=true)));
    Assert.That(observered, Is.Empty);
    Assert.True(completed);
}
[Test]
public void DisposingOfBufferDisposesUnderlyingSubscription()
{
    var underlyingSubscriptionWasDisposed = false;
    var underlying = Observable.Create<int>(observer => Disposable.Create(() => underlyingSubscriptionWasDisposed=  true   ));
    var buffer = Buffer<int>.StartBuffering(underlying);
    buffer.Dispose();
    Assert.True(underlyingSubscriptionWasDisposed);
}