StackOverflow 上有很多关于使用 ReactiveX 和是否使用Subjects问题。这些问题中的大多数最终都会被人们在哲学基础上来回争论,最终在实际示例中相当轻松,并且谈论而不是“正确”方法与“最佳”方法。

我编写了一个简单的示例类,它尝试将 SignalR 的 IHubProxy.On 桥接到 IObservable。



using Microsoft.AspNet.SignalR;
using Microsoft.AspNet.SignalR.Client;
using Microsoft.AspNet.SignalR.Hubs;
using System.Reactive.Linq;
using Owin;
using System;
using System.Reactive.Subjects;

namespace Acme.Core.SignalR
    public class SignalRPipe :IObservable<PipeCommand>, IDisposable
        readonly IHubProxy _MyHub;
        readonly IDisposable _HubOnListener;

        // this next line is BAD and I want to get rid of it, but I can't figure out how.
        private readonly Subject<PipeCommand> subject = new Subject<PipeCommand>();

        private SignalRPipe() : base()

        private SignalRPipe( IHubProxy hub ) : this()
            _MyHub = hub;
            _HubOnListener = _MyHub.On<PipeCommand>( "OnPipeCommand" , OnPipeCommand );

        private void OnPipeCommand( PipeCommand obj )
            subject.OnNext( obj );

        public IDisposable Subscribe( IObserver<PipeCommand> observer )
            return subject.Subscribe( observer );
            //var rtrn = Observable.Create<PipeCommand>( ob => {observer.Subscribe(t=> {  }, ,  } );
            //return rtrn;

        #region IDisposable Support
        private bool disposedValue = false; // To detect redundant calls

        protected virtual void Dispose( bool disposing )
            if ( !disposedValue )
                disposedValue = true;
                if ( disposing )


        public void Dispose()
            // Do not change this code. Put cleanup code in Dispose(bool disposing) above.
            Dispose( true );




using Microsoft.AspNet.SignalR.Client;
using System;

namespace Acme.Core.SignalR
    public class SignalRPipe : IObservable<PipeCommand>
        readonly IHubProxy _MyHub;

        private SignalRPipe() : base()

        private SignalRPipe( IHubProxy hub ) : this()
            _MyHub = hub;

        public IDisposable Subscribe( IObserver<PipeCommand> observer )
            var rtrn = _MyHub.On<PipeCommand>( "OnPipeCommand" , pc => observer.OnNext( pc ) );
            return rtrn;


