10

我正在将响应式扩展用于一种进程中的消息总线。实现非常简单。

注册看起来像

public IDisposable Register<T>(Action<T> action) where T : IMessage
{
   return this.subject
        .OfType<T>()
        .Subscribe(action);
}

并简单地发送:

private void SendMessage(IMessage message)
    {
        this.subject.OnNext(message);
    }

但是我现在对 RX 的异常行为有一些麻烦。在注册/订阅的操作中引发异常 - Observable“流”已损坏,将不再订阅。由于此消息总线用于应用程序的两个部分进行通信,因此我需要确保即使抛出意外异常也不会破坏这样的流。

4

2 回答 2

13

如果您需要确保流永远不会被异常中断,那么您需要为异常提供另一个通道。

这种行为并非完全出乎意料。从IObserver<T>接口的文档:

OnError 方法,通常由提供程序调用以指示数据不可用、不可访问或损坏,或者提供程序遇到了其他错误情况。

鉴于此,如果流不可用、损坏等,您肯定希望流“出现故障”(这类似于 WCF 中出现故障的通道);状态是不确定的,所以你不能依赖来自IObservable<T>实现的任何其他东西;那么为什么要期望会有更多的观察呢?

也就是说,您有一些选择:

吞下异常

您必须将action传递给Register函数的委托包装起来,如下所示:

public IDisposable Register<T>(Action<T> action) where T : IMessage
{
   return this.subject
        .OfType<T>()
        .Subscribe(t => {
            // Execute action
            try { action(t); }
            catch { }
         });
}

当然,这可能是不可取的,因为您可能会丢弃影响程序的异常(或者,您可能确切地知道这里发生了什么,并想跳过它们),但它可以用于构建下一个解决方案。

提供抛出异常时要采取的操作

使用上述作为基础,您可以要求在Action<T, Exception>抛出异常时调用哪个。

public IDisposable Register<T>(Action<T> action, 
    Action<T, Exception> errorHandler) where T : IMessage
{
   return this.subject
        .OfType<T>()
        .Subscribe(t => {
            // Execute action
            try { action(t); }
            catch (Exception e) { errorHandler(t, e); }
         });
}

现在,当从 抛出异常时action,它将被传递给您的异常处理程序,而不会中断流。

上面的内容很容易被重载以提供将吞没异常的行为(这同样可能会或可能不会满足您的目的):

public IDisposable Register<T>(Action<T> action) where T : IMessage
{
    // Call the overload, don't do anything on
    // exception.
    return Register(action, (t, e) => { });
}
于 2012-11-21T14:45:53.487 回答
1

基本上,您可以在 Subscribe 方法上提供 OnError 委托。我最近在我的博客上写了一个考虑这个问题的小教程(异常处理):http: //blog.andrei.rinea.ro/2013/06/01/bing-it-on-reactive-扩展故事代码和幻灯片/

看看 OnError 方法。基本上你需要重新订阅,因为 Rx 框架是在发生错误时序列被破坏的前提下构建的。

于 2013-06-01T20:58:54.603 回答