1

考虑IObservable<T>以下Subscribe实现:

public IDisposable Subscribe(IObserver<T> observer)
{
    if (observer == null)
    {
        throw new ArgumentNullException("observer");
    }

    lock (_subscriberSync)
    {
        var accepted = OnSubscribing(observer);   // <------ policy can be applied here

        if (!accepted)
        {
            /* #1 */ return null;
            /* #2 */ // return NullSubscription.Instance;
            /* #3 */ // throw new SubscriptionRejectedException("reason");
        }

        // ... perform actual subscription went here
    }
}

似乎没有任何关于如何建立被拒绝订阅的指导。理想情况下,我们有一个Boolean TrySubscribe(IObserver<T> observer, out IDisposable subscription)orMaybe<IDisposable> Subscribe(IObserver<T> observer)来表示条件,但似乎我们只能选择标志值或带外异常。每个都有缺点:

使用#1似乎我遇到的所有代码都不会检查 null,甚至 Resharper 静态分析也会NotNull在其上添加一个属性。

对于#2,我必须测试NullSubscription与#1没有太大区别的a,除非它不是很容易发现(典型的是“魔术”返回值)。

对于#3,我们有特殊情况下的典型流程,但对于延迟订阅,它会使调试复杂化。

除了您已经实施的这三种方法之外,还有其他选择吗?

4

2 回答 2

2

有什么问题observer.onError(new SomeException())

此外,您似乎正在手动实施IObservable<T>建议不要这样做。如果你必须这样做,你可以这样做:

    public IDisposable Subscribe(IObserver<T> observer)
    {
        lock (_subscriberSync)
        {
            var accepted = OnSubscribing(observer);   // <------ policy can be applied here

            if (!accepted)
            {
                observer.OnError(new SubscriptionRejectedException("reason"));
                return Disposable.Empty;
            }

            // ... perform actual subscription went here
        }
    }
于 2013-03-12T14:42:21.797 回答
0

我假设您有充分的实施理由IObservable<T>,因此我们将对此进行讨论-因此,如果我正确阅读了您的问题,您想知道在“订阅失败”的情况下返回什么很好地融合在一起与链式 Rx 查询逻辑的其余部分......嗯......好吧,我可能会使用该Disposable.Create方法:

public class MyObservable<T> : IObservable<T>
{
    private static object _subscriberSync = new object();
    private IObservable<T> _source;

    public MyObservable(IObservable<T> source)
    {
        _source = source;
    }

    protected virtual bool OnSubscribing(IObserver<T> obs)
    {
        return false;
    }

    public void DohSubscriptionFailed()
    {
        // Whatever the heck you want to do here?
        Console.WriteLine("Sorry, you never got a subscription");
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        if (observer == null)
        {
            throw new ArgumentNullException("observer");
        }

        lock (_subscriberSync)
        {
            var accepted = OnSubscribing(observer);   // <------ policy can be applied here

            if (!accepted)
            {
                return Disposable.Create(() => DohSubscriptionFailed());
            }

            // ... perform actual subscription went here
            return _source.Subscribe(observer);
        }
    }
}

用法和输出:

void Main()
{
    // A faked up source
    var source = new Subject<bool>();

    var query = new MyObservable<bool>(source);
    using(query.Subscribe(Console.WriteLine))
    {
        // nothing on output...
        source.OnNext(true);
        // still nothing on output...
        source.OnNext(false);
        Console.ReadLine();
    }
    // dispose fires, outputs "Sorry, you never got a subscription"
}

现在,正如这里所写的那样,它DohSubscriptionFailed并没有做任何有用的事情,但是根据您的整个用例,您可以在那里触发您想要的任何东西。

于 2013-03-13T15:55:04.063 回答