3

我将 Rx 与使用EventPattern. StartWatching()在此 API 中,您在对象上注册事件处理程序,然后在启动事件以开始触发的对象上调用方法。我正在使用Observable.FromEventPatternRx 世界中的 API 来桥接 API,但我遇到了非常奇怪的问题,即订阅只有在调用StartWatching(). 下面是我所看到的简化案例。

这有效:

foreach (var iq in interactionQueues)
        {
            Observable.FromEventPattern(iq, "TheEvent")
                .Subscribe(e => Log.Info("I got called!"), 
                       e => Log.Info("Error!", e),
                       () => Console.WriteLine("Seq completed!"));

            iq.StartWatching();
        }

如果我在不同的循环中调用Subscribe()andStartWatching()它会停止工作:

foreach (var iq in interactionQueues)
            Observable.FromEventPattern(iq, "TheEvent")
                .Subscribe(e => Log.Info("I got called!"), 
                       e => Log.Info("Error!", e),
                       () => Console.WriteLine("Seq completed!"));
foreach (var iq in interactionQueues)
           iq.StartWatching();

对于为什么会发生这种情况,我唯一的想法是观察或订阅发生在错误的线程上。我曾尝试使用Scheduler.CurrentThreadand Scheduler.Immediatewith SubscribeOnObserveOn但这并没有帮助。还有其他想法吗?我应该尝试不同的Scheduler还是那是红鲱鱼?

4

1 回答 1

1

让我们用一种更友好的方法来包装它:

public static TheEventArgs WatchEvent(this InteractionQueue this)
{
    var ret = Observable.Create<TheEventArgs>(subj => {
        // This entire block gets called every time someone calls Subscribe
        var disp = new CompositeDisposable();

        // Subscribe to the event
        disp.Add(Observable.FromEventPattern(iq, "TheEvent").Subscribe(subj));

        // Stop watching when we're done
        disp.Add(Disposable.Create(() => iq.StopWatching());

        iq.StartWatching();

        // This is what to Dispose on Unsubscribe
        return disp;
    });

    // When > 1 person Subscribes, only call the block above (i.e. StartWatching) once
    return ret.Multicast(new Subject<TheEventArgs>()).RefCount();
}
于 2012-12-13T22:27:04.130 回答