如果我是你,我会尝试尽可能接近 Rx 的实现方式来实现你的类。
关键的基本原则之一是使用相对较少的具体类,这些类使用大量操作组合在一起。因此,您应该创建一些基本的构建块并使用组合将它们组合在一起。
我将在 Reflector.NET 下初步了解两个类:AnonymousObservable<T>
& AnonymousObserver<T>
。特别AnonymousObservable<T>
是在整个 Rx 中用作实例化 observables 的基础。实际上,如果您查看派生自其中的对象,IObservable<T>
则有一些专门的实现,但仅AnonymousObservable<T>
用于一般用途。
静态方法Observable.Create<T>()
本质上是AnonymousObservable<T>
.
显然适合您要求的另一个 Rx 类是BehaviorSubject<T>
. 主题既是可观察者又是观察者,并且BehaviorSubject
适合您的情况,因为它会记住收到的最后一个值。
鉴于这些基本类,您几乎拥有创建特定对象所需的所有位。您的对象不应继承上述代码,而应使用组合来组合您需要的行为。
现在,我建议对您的类设计进行一些更改,以使它们与 Rx 更加兼容,从而更加可组合和健壮。
我会放弃你的Notifier<T>
课程,转而使用BehaviourSubject<T>
.
我会放弃你的Observer<T>
课程,转而使用AnonymousObserver<T>
.
然后我会修改ObservableValue<T>
为如下所示:
public class ObservableValue<T> : IObservable<T>, IDisposable
{
public ObservableValue(T initial) { ... }
public T Value { get; set; }
public IDisposable Subscribe(IObserver<T> observer);
public void Dispose();
}
的实现ObservableValue<T>
将包装BehaviourSubject<T>
而不是从它继承,因为暴露IObserver<T>
成员将允许访问OnCompleted
&OnError
这不会有太大意义,因为这个类代表一个值而不是计算。订阅将使用AnonymousObservable<T>
并Dispose
清理包装好的BehaviourSubject<T>
.
然后我会修改ComputedValue<T>
为如下所示:
public class ComputedValue<T> : IObservable<T>, IDisposable
{
public ComputedValue(IObservable<T> source) { ... }
public T Value { get; }
public IDisposable Subscribe(IObserver<T> observer);
public void Dispose();
}
该类ComputedValue<T>
将为所有订阅者包装AnonymousObservable<T>
,并用于source
获取Value
属性值的本地副本。该Dispose
方法将用于取消订阅source
observable。
最后两个类是您的设计似乎需要的唯一真正具体的实现——这仅仅是因为Value
属性。
ObservableValues
接下来,您需要为扩展方法创建一个静态类:
public static class ObservableValues
{
public static ObservableValue<T> Create<T>(T initial)
{ ... }
public static ComputedValue<V> Compute<T, U, V>(
this IObservable<T> left,
IObservable<U> right,
Func<T, U, V> computation)
{ ... }
}
该Compute
方法将使用 anAnonymousObservable<V>
来执行计算并生成一个IObservable<V>
传递给该ComputedValue<V>
方法返回的构造函数。
完成所有这些后,您现在可以编写以下代码:
var ov1 = ObservableValues.Create(1);
var ov2 = ObservableValues.Create(2);
var ov3 = ObservableValues.Create(3);
var cv1 = ov1.Compute(ov2, (x, y) => x + y);
var cv2 = ov3.Compute(cv1, (x, y) => x * y);
//cv2.Value == 9
ov1.Value = 2;
ov2.Value = 3;
ov3.Value = 4;
//cv2.Value == 20
请让我知道这是否有帮助和/或是否有任何我可以详细说明的内容。
编辑:还需要一些一次性用品。
您还需要实施AnonymousDisposable
&CompositeDisposable
来管理您的订阅,尤其是在Compute
扩展方法中。看看使用 Reflector.NET 的 Rx 实现或使用下面的我的版本。
public sealed class AnonymousDisposable : IDisposable
{
private readonly Action _action;
private int _disposed;
public AnonymousDisposable(Action action)
{
_action = action;
}
public void Dispose()
{
if (Interlocked.Exchange(ref _disposed, 1) == 0)
{
_action();
}
}
}
public sealed class CompositeDisposable : IEnumerable<IDisposable>, IDisposable
{
private readonly List<IDisposable> _disposables;
private bool _disposed;
public CompositeDisposable()
: this(new IDisposable[] { })
{ }
public CompositeDisposable(IEnumerable<IDisposable> disposables)
{
if (disposables == null) { throw new ArgumentNullException("disposables"); }
this._disposables = new List<IDisposable>(disposables);
}
public CompositeDisposable(params IDisposable[] disposables)
{
if (disposables == null) { throw new ArgumentNullException("disposables"); }
this._disposables = new List<IDisposable>(disposables);
}
public void Add(IDisposable disposable)
{
if (disposable == null) { throw new ArgumentNullException("disposable"); }
lock (_disposables)
{
if (_disposed)
{
disposable.Dispose();
}
else
{
_disposables.Add(disposable);
}
}
}
public IDisposable Add(Action action)
{
if (action == null) { throw new ArgumentNullException("action"); }
var disposable = new AnonymousDisposable(action);
this.Add(disposable);
return disposable;
}
public IDisposable Add<TDelegate>(Action<TDelegate> add, Action<TDelegate> remove, TDelegate handler)
{
if (add == null) { throw new ArgumentNullException("add"); }
if (remove == null) { throw new ArgumentNullException("remove"); }
if (handler == null) { throw new ArgumentNullException("handler"); }
add(handler);
return this.Add(() => remove(handler));
}
public void Clear()
{
lock (_disposables)
{
var disposables = _disposables.ToArray();
_disposables.Clear();
Array.ForEach(disposables, d => d.Dispose());
}
}
public void Dispose()
{
lock (_disposables)
{
if (!_disposed)
{
this.Clear();
}
_disposed = true;
}
}
public IEnumerator<IDisposable> GetEnumerator()
{
lock (_disposables)
{
return _disposables.ToArray().AsEnumerable().GetEnumerator();
}
}
IEnumerator IEnumerable.GetEnumerator()
{
return this.GetEnumerator();
}
public bool IsDisposed
{
get
{
return _disposed;
}
}
}