7

反应式扩展允许您使用 轻松订阅事件Observable.FromEventPattern,但我找不到任何关于当您拥有IObservable.

我的情况是这样的:我需要实现一个包含事件的接口。每当我的对象的某个值发生更改时,都应该调用该事件,并且出于线程安全原因,我需要在某个SynchronizationContext. 我还应该使用注册时的当前值调用每个事件处理程序。

public interface IFooWatcher
{
    event FooChangedHandler FooChanged;
}

使用 Rx 获得一个可以满足我要求的 observable 相当容易BehaviorSubject

public class FooWatcher
{
    private readonly BehaviorSubject<Foo> m_subject;
    private readonly IObservable<Foo> m_observable;

    public FooWatcher(SynchronizationContext synchronizationContext, Foo initialValue)
    {
        m_subject = new BehaviorSubject<Foo>(initialValue);
        m_observable = m_subject
            .DistinctUntilChanged()
            .ObserveOn(synchronizationContext);
    }

    public event FooChangedHandler FooChanged
    {
        add { /* ??? */ }
        remove { /* ??? */ }
    }
}

现在我正在寻找一种简单的方法来让addandremove函数订阅和取消订阅FooChangedHandler作为Observer<Foo>on传递的内容m_observable。我当前的实现看起来与此类似:

    add
    {
        lock (m_lock)
        {
            IDisposable disp = m_observable.Subscribe(value);
            m_registeredObservers.Add(
                new KeyValuePair<FooChangedHandler, IDisposable>(
                    value, disp));
        }
    }

    remove
    {
        lock (m_lock)
        {
            KeyValuePair<FooChangedHandler, IDisposable> observerDisposable =
                m_registeredObservers
                    .First(pair => object.Equals(pair.Key, value));
            m_registeredObservers.Remove(observerDisposable);
            observerDisposable.Value.Dispose();
        }
    }

但是,我希望找到一个更简单的解决方案,因为我需要实现其中的几个事件(具有不同的处理程序类型)。我尝试推出自己的通用解决方案,但它产生了一些需要解决的额外问题(特别是,您通常如何使用带有参数的委托T),所以我更愿意找到一个现有的解决方案来桥接在这个方向上的差距 - 就像FromEventPattern反过来一样。

4

2 回答 2

2

你可以这样做:

public event FooChangedHandler FooChanged
{
    add { m_observable.ToEvent().OnNext += value; }
    remove { m_observable.ToEvent().OnNext -= value; }
}

但是,在删除时,我认为您可能只想处理订阅......或者可能从 ToEvent() 获取 Action 并将其存储为成员。未经测试。

编辑:但是,您必须使用 Action 而不是 FooChangedHandler 委托。

编辑 2:这是一个经过测试的版本。但是,我想您需要使用 FooChangedHandler,因为您有一堆这些预先存在的处理程序?

void Main()
{
    IObservable<Foo> foos = new [] { new Foo { X = 1 }, new Foo { X = 2 } }.ToObservable();
    var watcher = new FooWatcher(SynchronizationContext.Current, new Foo { X = 12 });
    watcher.FooChanged += o => o.X.Dump();  
    foos.Subscribe(watcher.Subject.OnNext); 
}

// Define other methods and classes here

//public delegate void FooChangedHandler(Foo foo);
public interface IFooWatcher
{
    event Action<Foo> FooChanged;
}

public class Foo {
    public int X { get; set; }
}
public class FooWatcher
{

    private readonly BehaviorSubject<Foo> m_subject;
    public BehaviorSubject<Foo> Subject { get { return m_subject; } }
    private readonly IObservable<Foo> m_observable;

    public FooWatcher(SynchronizationContext synchronizationContext, Foo initialValue)
    {
        m_subject = new BehaviorSubject<Foo>(initialValue);

        m_observable = m_subject
            .DistinctUntilChanged();
    }

    public event Action<Foo> FooChanged
    {
        add { m_observable.ToEvent().OnNext += value; }
        remove { m_observable.ToEvent().OnNext -= value; }
    }
}
于 2013-03-05T19:52:29.403 回答
0

鉴于您已经混合了反应式代码和更普通代码之间的界限,您可以做一个反应性较低的版本。首先声明一个正常的事件模式

public event FooChangedHandler FooChanged;

protected void OnFooChanged(Foo)
{
  var temp = FooChanged;
  if (temp != null)
  {  
    temp(new FooChangedEventArgs(Foo));
  }
}

然后在构造函数中简单地将 observable 连接到它

m_Observable.Subscribe(foo => OnFooChanged(foo));

它不是很 Rx,但它非常简单。

于 2013-03-06T07:11:28.213 回答