39

Reactive Extensions 附带了许多帮助方法,用于将现有事件和异步操作转换为可观察对象,但是您将如何从头开始实现 IObservable<T> 呢?

IEnumerable 具有可爱的 yield 关键字,使其非常易于实现。

实现 IObservable<T> 的正确方法是什么?

我需要担心线程安全吗?

我知道支持在特定同步上下文中回调,但这是我作为 IObservable<T> 作者需要担心的事情,还是以某种方式内置的?

更新:

这是我的 C# 版本的 Brian 的 F# 解决方案

using System;
using System.Linq;
using Microsoft.FSharp.Collections;

namespace Jesperll
{
    class Observable<T> : IObservable<T>, IDisposable where T : EventArgs
    {
        private FSharpMap<int, IObserver<T>> subscribers = 
                 FSharpMap<int, IObserver<T>>.Empty;
        private readonly object thisLock = new object();
        private int key;
        private bool isDisposed;

        public void Dispose()
        {
            Dispose(true);
        }

        protected virtual void Dispose(bool disposing)
        {
            if (disposing && !isDisposed)
            {
                OnCompleted();
                isDisposed = true;
            }
        }

        protected void OnNext(T value)
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnNext(value);
            }
        }

        protected void OnError(Exception exception)
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            if (exception == null)
            {
                throw new ArgumentNullException("exception");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnError(exception);
            }
        }

        protected void OnCompleted()
        {
            if (isDisposed)
            {
                throw new ObjectDisposedException("Observable<T>");
            }

            foreach (IObserver<T> observer in subscribers.Select(kv => kv.Value))
            {
                observer.OnCompleted();
            }
        }

        public IDisposable Subscribe(IObserver<T> observer)
        {
            if (observer == null)
            {
                throw new ArgumentNullException("observer");
            }

            lock (thisLock)
            {
                int k = key++;
                subscribers = subscribers.Add(k, observer);
                return new AnonymousDisposable(() =>
                {
                    lock (thisLock)
                    {
                        subscribers = subscribers.Remove(k);
                    }
                });
            }
        }
    }

    class AnonymousDisposable : IDisposable
    {
        Action dispose;
        public AnonymousDisposable(Action dispose)
        {
            this.dispose = dispose;
        }

        public void Dispose()
        {
            dispose();
        }
    }
}

编辑:如果 Dispose 被调用两次,不要抛出 ObjectDisposedException

4

5 回答 5

11

官方文档不赞成用户自己实现 IObservable 。相反,用户应该使用工厂方法Observable.Create

如果可能,通过组合现有的运算符来实现新的运算符。否则使用 Observable.Create 实现自定义运算符

碰巧 Observable.Create 是 Reactive 内部类的一个简单的包装器AnonymousObservable

public static IObservable<TSource> Create<TSource>(Func<IObserver<TSource>, IDisposable> subscribe)
{
    if (subscribe == null)
    {
        throw new ArgumentNullException("subscribe");
    }
    return new AnonymousObservable<TSource>(subscribe);
}

我不知道他们为什么不公开他们的实施,但是,不管怎样。

于 2012-06-14T10:38:23.873 回答
9

老实说,我不确定这一切有多“正确”,但根据我目前的经验,如果感觉还不错。它是 F# 代码,但希望您能体会其中的味道。它允许您“新建”一个源对象,然后您可以调用 Next/Completed/Error on,它管理订阅并在源或客户端做坏事时尝试断言。

type ObservableSource<'T>() =     // '
    let protect f =
        let mutable ok = false
        try 
            f()
            ok <- true
        finally
            Debug.Assert(ok, "IObserver methods must not throw!")
            // TODO crash?
    let mutable key = 0
    // Why a Map and not a Dictionary?  Someone's OnNext() may unsubscribe, so we need threadsafe 'snapshots' of subscribers to Seq.iter over
    let mutable subscriptions = Map.empty : Map<int,IObserver<'T>>  // '
    let next(x) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnNext(x)))
    let completed() = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnCompleted()))
    let error(e) = subscriptions |> Seq.iter (fun (KeyValue(_,v)) -> protect (fun () -> v.OnError(e)))
    let thisLock = new obj()
    let obs = 
        { new IObservable<'T> with       // '
            member this.Subscribe(o) =
                let k =
                    lock thisLock (fun () ->
                        let k = key
                        key <- key + 1
                        subscriptions <- subscriptions.Add(k, o)
                        k)
                { new IDisposable with 
                    member this.Dispose() = 
                        lock thisLock (fun () -> 
                            subscriptions <- subscriptions.Remove(k)) } }
    let mutable finished = false
    // The methods below are not thread-safe; the source ought not call these methods concurrently
    member this.Next(x) =
        Debug.Assert(not finished, "IObserver is already finished")
        next x
    member this.Completed() =
        Debug.Assert(not finished, "IObserver is already finished")
        finished <- true
        completed()
    member this.Error(e) =
        Debug.Assert(not finished, "IObserver is already finished")
        finished <- true
        error e
    // The object returned here is threadsafe; you can subscribe and unsubscribe (Dispose) concurrently from multiple threads
    member this.Value = obs

我会对任何关于这里好坏的想法感兴趣;我还没有机会看到来自 devlabs 的所有新的 Rx 东西......

我自己的经验表明:

  • 那些订阅 observables 的人永远不应该从订阅中抛出。当订阅者抛出异常时,observable 无法做任何合理的事情。(这类似于事件。)异常很可能只是冒泡到顶级的 catch-all 处理程序或使应用程序崩溃。
  • 源可能应该是“逻辑上单线程的”。我认为编写可以对并发 OnNext 调用做出反应的客户端可能更难;即使每个单独的调用来自不同的线程,也有助于避免并发调用。
  • 拥有一个强制执行一些“合同”的基类/助手类绝对有用。

我很好奇人们是否可以在这些方面提出更具体的建议。

于 2009-11-20T14:37:13.683 回答
7

是的,yield 关键字很可爱;也许 IObservable(OfT) 会有类似的东西?[编辑:在 Eric Meijer 的PDC '09 演讲中,他说“是的,注意这个空间”以声明性的收益来生成可观察的。]

对于接近的东西(而不是自己滚动),请查看“ (尚未)101 Rx Samples ”维基的底部,团队建议使用 Subject(T) 类作为“后端”来实现 IObservable(的)。这是他们的例子:

public class Order
{            
    private DateTime? _paidDate;

    private readonly Subject<Order> _paidSubj = new Subject<Order>();
    public IObservable<Order> Paid { get { return _paidSubj.AsObservable(); } }

    public void MarkPaid(DateTime paidDate)
    {
        _paidDate = paidDate;                
        _paidSubj.OnNext(this); // Raise PAID event
    }
}

private static void Main()
{
    var order = new Order();
    order.Paid.Subscribe(_ => Console.WriteLine("Paid")); // Subscribe

    order.MarkPaid(DateTime.Now);
}
于 2010-09-07T01:29:31.617 回答
2
  1. 破解打开反射器,看看。

  2. 观看一些 C9 视频 -这个视频展示了如何“导出” Select 'combinator'

  3. 秘诀是创建 AnonymousObservable、AnonymousObserver 和 AnonymousDisposable 类(它们只是解决您无法实例化接口的问题)。它们包含零实现,因为您使用 Actions 和 Funcs 传递它。

例如:

public class AnonymousObservable<T> : IObservable<T>
{
    private Func<IObserver<T>, IDisposable> _subscribe;
    public AnonymousObservable(Func<IObserver<T>, IDisposable> subscribe)
    {
        _subscribe = subscribe;
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return _subscribe(observer);
    }
}

我会让你解决剩下的……这是一个很好的理解练习。

这里有一个很好的小线程,里面有相关的问题。

于 2009-11-20T10:13:58.737 回答
2

关于这个实现只有一句话:

在 .net fw 4 中引入并发集合之后,使用 ConcurrentDictioary 而不是简单的字典可能会更好。

它节省了对集合的处理锁。

阿迪。

于 2010-07-06T07:38:37.427 回答