0

我无法说服我的老板接受我们产品中安装的 Rx 库,所以为了学习或编码方便,我更喜欢实现一些简单的功能,如官方 Rx 中提供的,我这里选择 2 正是我现在需要的(我已经实现了一些基本概念,例如 IObservable、IObserver、ISubject):

Where<TSource>(IObservable<TSource>, Func<TSource, Boolean>)
Timeout<TSource>(IObservable<TSource>, DateTimeOffset)

我花了一些时间思考如何实现“在哪里”,但即使阅读一些 ILSpy 代码也无法得到任何想法,毕竟它不像 IEnumerable,有人可以给出一些提示吗?

4

3 回答 3

2

最后我对通过一个TempObservable<T>非常简单的实现它有了一些想法,示例代码如下所示:

private class TempObservable<T> : ISubject<T>
    {
        private List<IObserver<T>> observers = new List<IObserver<T>>();
        //private IObserver<T> observer;

        #region IObservable<T> Members

        public IDisposable Subscribe(IObserver<T> observer)
        {
            this.observers.Add(observer);
            return new UnSubscribe<T>(this.observers, observer);
        }

        #endregion


        #region IObserver<T> Members

        public void OnCompleted()
        {
            throw new NotImplementedException();
        }

        public void OnError(Exception e)
        {
            this.observers.ForEach((ob) => ob.OnError(e));
        }

        public void OnNext(T value)
        {
            this.observers.ForEach((ob) => ob.OnNext(value));
        }

        #endregion
    }

然后我们可以写我们自己的Where<T>

public static IObservable<T> Where<T>(this IObservable<T> source, Func<T, bool> predict)
    {
        ISubject<T> filteredObservable = new TempObservable<T>();
        source.Subscribe(s => { if (predict(s)) { filteredObservable.OnNext(s); } }, () => { filteredObservable.OnCompleted(); }, ex => { filteredObservable.OnError(ex); });
        return filteredObservable;
    }

与其他有用的扩展类似。

于 2013-08-05T07:14:55.437 回答
0

为什么不使用支持 .NET 3.5 的响应式扩展 1.x?我相信这是支持 .NET 3.5 SP1 的最后一个版本:http: //www.microsoft.com/en-us/download/details.aspx?id=28018

rx 的源代码也可以在http://rx.codeplex.com获得

于 2013-06-06T08:39:10.617 回答
-2

这是答案的工作版本@Shawn(他的答案缺少UnSubscribe<T>方法)。感谢您@Shawn的出色工作!

请注意,这是最简单的工作实现,以便理解Subject<T>. 它不是线程安全的。

/// <summary>
///     Simplest possible implementation of Subject(T). See http://bit.ly/1QUdpq1.
/// </summary>
/// <typeparam name="T"></typeparam>
public class TempObservable<T> : ISubject<T>, IDisposable
{
    private readonly List<IObserver<T>> observers = new List<IObserver<T>>();

    #region IObservable<T> Members  
    public IDisposable Subscribe(IObserver<T> observer)
    {
        this.observers.Add(observer);

        // Could return ".this", but this would introduce an issue: if one subscriber unsubscribed, it would
        // unsubscribe all subscribers.
        return new Unsubscribe<T>(this.observers, observer);
    }
    #endregion


    #region IObserver<T> Members
    public void OnNext(T value)
    {
        this.observers.ForEach((ob) => ob.OnNext(value));
    }
    public void OnError(Exception e)
    {
        this.observers.ForEach((ob) => ob.OnError(e));
    }
    public void OnCompleted()
    {
        this.Dispose();
    }
    #endregion

    public void Dispose()
    {
        observers.Clear();
    }
}

public class Unsubscribe<T> : IDisposable
{
    private readonly IObserver<T> _observer;
    private readonly List<IObserver<T>> _observers;

    public Unsubscribe(List<IObserver<T>> observers, IObserver<T> observer)
    {
        _observer = observer;
        _observers = observers;
    }

    public void Dispose()
    {
        int index = _observers.IndexOf(_observer);
        _observers.RemoveAt(index);
    }
}

[TestFixture]
public static class TempObservable_Test
{
    [Test]
    public static void UnitTest()
    {

        TempObservable<int> x = new TempObservable<int>();
        x.Subscribe(o =>
        {
            Console.Write("Test 1: {0}\n", o);
        });
        var toDispose = x.Subscribe(o =>
        {
            Console.Write("Test 2: {0}\n", o);
        });

        x.OnNext(1);
        toDispose.Dispose();
        x.OnNext(2);
        x.Dispose();
        x.OnNext(3);

        x.Where(o => o == 5).Subscribe(o =>
        {
            Console.Write("Tested: {0}\n", o);
        });
        x.OnNext(4);
        x.OnNext(5);
    }
}
于 2015-12-26T17:41:04.427 回答