这是答案的工作版本@Shawn
(他的答案缺少UnSubscribe<T>
方法)。感谢您@Shawn
的出色工作!
请注意,这是最简单的工作实现,以便理解Subject<T>
. 它不是线程安全的。
/// <summary>
/// Simplest possible implementation of Subject(T). See http://bit.ly/1QUdpq1.
/// </summary>
/// <typeparam name="T"></typeparam>
public class TempObservable<T> : ISubject<T>, IDisposable
{
private readonly List<IObserver<T>> observers = new List<IObserver<T>>();
#region IObservable<T> Members
public IDisposable Subscribe(IObserver<T> observer)
{
this.observers.Add(observer);
// Could return ".this", but this would introduce an issue: if one subscriber unsubscribed, it would
// unsubscribe all subscribers.
return new Unsubscribe<T>(this.observers, observer);
}
#endregion
#region IObserver<T> Members
public void OnNext(T value)
{
this.observers.ForEach((ob) => ob.OnNext(value));
}
public void OnError(Exception e)
{
this.observers.ForEach((ob) => ob.OnError(e));
}
public void OnCompleted()
{
this.Dispose();
}
#endregion
public void Dispose()
{
observers.Clear();
}
}
public class Unsubscribe<T> : IDisposable
{
private readonly IObserver<T> _observer;
private readonly List<IObserver<T>> _observers;
public Unsubscribe(List<IObserver<T>> observers, IObserver<T> observer)
{
_observer = observer;
_observers = observers;
}
public void Dispose()
{
int index = _observers.IndexOf(_observer);
_observers.RemoveAt(index);
}
}
[TestFixture]
public static class TempObservable_Test
{
[Test]
public static void UnitTest()
{
TempObservable<int> x = new TempObservable<int>();
x.Subscribe(o =>
{
Console.Write("Test 1: {0}\n", o);
});
var toDispose = x.Subscribe(o =>
{
Console.Write("Test 2: {0}\n", o);
});
x.OnNext(1);
toDispose.Dispose();
x.OnNext(2);
x.Dispose();
x.OnNext(3);
x.Where(o => o == 5).Subscribe(o =>
{
Console.Write("Tested: {0}\n", o);
});
x.OnNext(4);
x.OnNext(5);
}
}