2

我有一个公开IObservable状态的 API。但是这种状态取决于必须通过初始化的底层可观察源Init

我想做的是保护用户不必以正确的顺序做事:就目前的情况而言,如果他们Status在执行之前尝试订阅Init,他们会得到一个异常,因为他们的源没有初始化。

所以我有一个天才的想法,使用 aSubject将两者解耦:订阅 my 的外部用户Status只是订阅了 Subject,然后当他们调用 时Init,我使用我的 Subject 订阅底层服务。

代码中的想法

private ISubject<bool> _StatusSubject = new Subject<bool>();
public IObservable<bool> Status { get { return _StatusSubject; } }

public void Init() 
{
    _Connection = new Connection();
    Underlying.GetDeferredObservable(_Connection).Subscribe(_StatusSubject);
}

但是,从对虚拟项目的测试来看,问题在于初始化通过订阅 Subject 来“唤醒”我的底层 Observable,即使还没有人订阅该主题。如果可能的话,我想避免这种情况,但我不确定如何......

(我也注意到“一般规则是,如果你使用一个主题,那么你做错了什么”这一公认的智慧)

4

3 回答 3

7

似乎您缺少的概念是如何知道某人何时开始收听并仅初始化您的基础来源。通常你使用Observable.Create它的兄弟姐妹之一 ( Defer, Using, ...) 来做到这一点。

以下是没有 a 的方法Subject

private IObservable<bool> _status = Observable.Defer(() =>
{
    _Connection = new Connection();
    return Underlying.GetDeferredObservable(_Connection);
};

public IObservable<bool> Status { get { return _status; } }

Defer在有人真正订阅之前不会调用初始化代码。

但这有几个潜在的问题:

  1. 每个观察者都会建立一个新的连接
  2. 当观察者取消订阅时,连接不会被清除。

第二个问题很容易解决,所以让我们先解决这个问题。假设你Connection是一次性的,在这种情况下你可以这样做:

private IObservable<bool> _status = Observable
    .Using(() => new Connection(),
           connection => Underlying.GetDeferredObservable(connection));

public IObservable<bool> Status { get { return _status; } }

通过这个迭代,每当有人订阅时,都会Connection创建一个新的并传递给第二个 Lamba 方法来构造 observable。每当观察者退订时,Connection就是Disposed。如果Connection不是 a IDisposable,那么您可以使用Disposable.Create(Action)创建一个IDisposable它将运行您需要运行的任何操作来清理连接。

您仍然有每个观察者创建一个新连接的问题。我们可以使用PublishandRefCount来解决这个问题:

private IObservable<bool> _status = Observable
    .Using(() => new Connection(),
           connection => Underlying.GetDeferredObservable(connection))
    .Publish()
    .RefCount();

public IObservable<bool> Status { get { return _status; } }

现在,当第一个观察者订阅时,将创建连接并订阅底层的 observable。后续观察者将共享连接并获取当前状态。当最后一个观察者取消订阅时,连接将被释放,一切都关闭。如果在那之后另一个观察者订阅了,这一切都会重新开始。

在底层,Publish实际上是使用 aSubject来共享单个可观察源。并且RefCount正在跟踪目前有多少观察者正在观察。

于 2013-05-07T23:15:54.973 回答
1

我可能在这里过于简单化了,但让我Subject按要求使用:

你的Thingy

public class Thingy
{
    private BehaviorSubject<bool> _statusSubject = new BehaviorSubject<bool>(false);    
    public IObservable<bool> Status
    {
        get
        {
            return _statusSubject;
        }
    }

    public void Init()
    {
        var c = new object();
        new Underlying().GetDeferredObservable(c).Subscribe(_statusSubject);
    }
}

一个伪造的Underlying

public class Underlying
{
    public IObservable<bool> GetDeferredObservable(object connection)
    {
        return Observable.DeferAsync<bool>(token => {
            return Task.Factory.StartNew(() => {
                Console.WriteLine("UNDERLYING ENGAGED");
                Thread.Sleep(1000);
                // Let's pretend there's some static on the line...
                return Observable.Return(true)
                    .Concat(Observable.Return(false))
                    .Concat(Observable.Return(true));
            }, token);
        });
    }
}

线束:

void Main()
{
    var thingy = new Thingy();
    using(thingy.Status.Subscribe(stat => Console.WriteLine("Status:{0}", stat)))
    {
        Console.WriteLine("Waiting three seconds to Init...");
        Thread.Sleep(3000);
        thingy.Init();
        Console.ReadLine();
    }
}

输出:

Status:False
Waiting three seconds to Init...
UNDERLYING ENGAGED
Status:True
Status:False
Status:True
于 2013-05-07T15:58:15.000 回答
0

嗯,玩过这个,我不认为我可以只用一个主题来做到这一点。

尚未完成测试/尝试,但这是我目前提出的似乎可行的方法,但它并不能保护我免受主题问题的影响,因为我仍在内部使用一个。

public class ObservableRouter<T> : IObservable<T>
{
    ISubject<T> _Subject = new Subject<T>();
    Dictionary<IObserver<T>, IDisposable> _ObserverSubscriptions 
                               = new Dictionary<IObserver<T>, IDisposable>();
    IObservable<T> _ObservableSource;
    IDisposable _SourceSubscription;

    //Note that this can happen before or after SetSource
    public IDisposable Subscribe(IObserver<T> observer)
    {
        _ObserverSubscriptions.Add(observer, _Subject.Subscribe(observer));
        IfReadySubscribeToSource();
        return Disposable.Create(() => UnsubscribeObserver(observer));
    }

    //Note that this can happen before or after Subscribe
    public void SetSource(IObservable<T> observable)
    {
        if(_ObserverSubscriptions.Count > 0 && _ObservableSource != null) 
                  throw new InvalidOperationException("Already routed!");
        _ObservableSource = observable;
        IfReadySubscribeToSource();
    }

    private void IfReadySubscribeToSource()
    {
        if(_SourceSubscription == null &&
           _ObservableSource != null && 
           _ObserverSubscriptions.Count > 0)
        {
            _SourceSubscription = _ObservableSource.Subscribe(_Subject);
        }
    }

    private void UnsubscribeObserver(IObserver<T> observer)
    {
        _ObserverSubscriptions[observer].Dispose();
        _ObserverSubscriptions.Remove(observer);
        if(_ObserverSubscriptions.Count == 0)
        {
            _SourceSubscription.Dispose();
            _SourceSubscription = null;
        }
    }
}
于 2013-05-07T09:52:20.310 回答