12

OnNext我使用ObserveOn(Scheduler.ThreadPool). 我发现处理这个问题的唯一方法是使用下面的自定义扩展方法(除了确保 OnNext 永远不会引发异常)。然后确保每个ObserveOn后跟一个ExceptionToError.

    public static IObservable<T> ExceptionToError<T>(this IObservable<T> source) {
        var sub = new Subject<T>();
        source.Subscribe(i => {
            try {
                sub.OnNext(i);
            } catch (Exception err) {
                sub.OnError(err);
            }
        }
            , e => sub.OnError(e), () => sub.OnCompleted());
        return sub;
    }

但是,这感觉不对。有没有更好的方法来处理这个问题?

例子

该程序由于未捕获的异常而崩溃。

class Program {
    static void Main(string[] args) {
        try {
            var xs = new Subject<int>();

            xs.ObserveOn(Scheduler.ThreadPool).Subscribe(x => {
                Console.WriteLine(x);
                if (x % 5 == 0) {
                    throw new System.Exception("Bang!");
                }
            }, ex => Console.WriteLine("Caught:" + ex.Message)); // <- not reached

            xs.OnNext(1);
            xs.OnNext(2);
            xs.OnNext(3);
            xs.OnNext(4);
            xs.OnNext(5);
        } catch (Exception e) {
            Console.WriteLine("Caught : " + e.Message); // <- also not reached
        } finally {

            Console.ReadKey();
        }
    }
}
4

5 回答 5

14

我们在 Rx v2.0 中解决了这个问题,从 RC 版本开始。您可以在我们的博客http://blogs.msdn.com/rxteam上阅读所有相关信息。它基本上归结为管道本身中更严格的错误处理,结合了 SubscribeSafe 扩展方法(将订阅期间的错误重定向到 OnError 通道)和 IScheduler 上的 Catch 扩展方法(将调度程序与异常处理逻辑包装在调度行动)。

关于这里提出的 ExceptionToError 方法,它有一个缺陷。回调运行时,IDisposable 订阅对象仍然可以为空;有一个基本的竞争条件。要解决此问题,您必须使用 SingleAssignmentDisposable。

于 2012-06-25T09:16:45.150 回答
6

订阅错误和可观察错误之间存在差异。快速测试:

var xs = new Subject<int>();

xs.Subscribe(x => { Console.WriteLine(x); if (x % 3 == 0) throw new System.Exception("Error in subscription"); }, 
             ex => Console.WriteLine("Error in source: " + ex.Message));

运行这个,你会在源代码中得到一个很好的处理错误:

xs.OnNext(1);
xs.OnNext(2);
xs.OnError(new Exception("from source"));

运行此命令,您将在订阅中收到未处理的错误:

xs.OnNext(1);
xs.OnNext(2);
xs.OnNext(3);

您的解决方案所做的是在订阅中出错并在源中使它们出错。而且您已经在原始流上完成了此操作,而不是基于每个订阅。您可能打算也可能不打算这样做,但这几乎肯定是错误的。

做到这一点的“正确”方法是将您需要的错误处理直接添加到它所属的订阅操作中。如果不想直接修改订阅功能,可以使用一个小帮手:

public static Action<T> ActionAndCatch<T>(Action<T> action, Action<Exception> catchAction)
{
    return item =>
    {
        try { action(item); }
        catch (System.Exception e) { catchAction(e); }
    };
}

现在使用它,再次显示不同错误之间的区别:

xs.Subscribe(ActionAndCatch<int>(x => { Console.WriteLine(x); if (x % 3 == 0) throw new System.Exception("Error in subscription"); },
                                 ex => Console.WriteLine("Caught error in subscription: " + ex.Message)),
             ex => Console.WriteLine("Error in source: " + ex.Message));

现在我们可以(分别)处理源中的错误和订阅中的错误。当然,这些动作中的任何一个都可以在一个方法中定义,使上面的代码很简单(可能):

xs.Subscribe(ActionAndCatch(Handler, ExceptionHandler), SourceExceptionHandler);

编辑

在评论中,我们开始讨论订阅中的错误指向流本身的错误,并且您不希望该流上的其他订阅者。这是一个完全不同类型的问题。我倾向于编写一个可观察的Validate扩展来处理这种情况:

public static IObservable<T> Validate<T>(this IObservable<T> source, Predicate<T> valid)
{
    return Observable.Create<T>(o => {
        return source.Subscribe(
            x => {
                if (valid(x)) o.OnNext(x);
                else       o.OnError(new Exception("Could not validate: " + x));
            }, e => o.OnError(e), () => o.OnCompleted()
        );
    });
}

然后简单易用,无需混合隐喻(仅在源代码中出现错误):

xs
.Validate(x => x != 3)
.Subscribe(x => Console.WriteLine(x),
             ex => Console.WriteLine("Error in source: " + ex.Message));

如果您仍然想要抑制异常,Subscribe您应该使用其他讨论的方法之一。

于 2012-06-25T03:07:33.753 回答
4

您当前的解决方案并不理想。正如这里的一位 Rx 人所说:

Rx 运算符不会捕获在调用 OnNext、OnError 或 OnCompleted 时发生的异常。这是因为我们期望 (1) 观察者实现者最清楚如何处理这些异常,我们不能对它们做任何合理的事情; (2) 如果发生异常,那么我们希望它冒泡而不被 Rx 处理.

您当前的解决方案让 IObservable 来处理 IObserver 抛出的错误,这没有意义,因为语义上 IObservable 应该不知道观察它的事物。考虑以下示例:

var errorFreeSource = new Subject<int>();
var sourceWithExceptionToError = errorFreeSource.ExceptionToError();
var observerThatThrows = Observer.Create<int>(x =>
  {
      if (x % 5 == 0)
          throw new Exception();
  },
  ex => Console.WriteLine("There's an argument that this should be called"),
  () => Console.WriteLine("OnCompleted"));
var observerThatWorks = Observer.Create<int>(
    x => Console.WriteLine("All good"),
    ex => Console.WriteLine("But definitely not this"),
    () => Console.WriteLine("OnCompleted"));
sourceWithExceptionToError.Subscribe(observerThatThrows);
sourceWithExceptionToError.Subscribe(observerThatWorks);
errorFreeSource.OnNext(1);
errorFreeSource.OnNext(2);
errorFreeSource.OnNext(3);
errorFreeSource.OnNext(4);
errorFreeSource.OnNext(5);
Console.ReadLine();

此处源或observerThatWorks没有问题,但由于与另一个Observer无关的错误,将调用其OnError。要阻止不同线程中的异常结束进程,您必须在该线程中捕获它们,因此在您的观察者中放置一个 try/catch 块。

于 2012-06-25T04:31:05.863 回答
1

我查看了SubscribeSafe应该解决此问题的本机方法,但我无法使其工作。此方法有一个接受 的重载IObserver<T>

// Subscribes to the specified source, re-routing synchronous exceptions during
// invocation of the IObservable<T>.Subscribe(IObserver<T>) method to the
// observer's IObserver<T>.OnError(Exception) channel. This method is typically
// used when writing query operators.
public static IDisposable SubscribeSafe<T>(this IObservable<T> source,
    IObserver<T> observer);

我尝试传递由工厂方法创建的观察者Observer.Create,但onNext处理程序中的异常继续使进程崩溃¹,就像它们对正常的Subscribe. 所以我最终编写了自己的SubscribeSafe. onNext这个接受三个处理程序作为参数,并将由和onCompleted处理程序抛出的任何异常集中到onError处理程序。

/// <summary>Subscribes an element handler, an error handler, and a completion
/// handler to an observable sequence. Any exceptions thrown by the element or
/// the completion handler are propagated through the error handler.</summary>
public static IDisposable SubscribeSafe<T>(this IObservable<T> source,
    Action<T> onNext, Action<Exception> onError, Action onCompleted)
{
    // Arguments validation omitted
    var disposable = new SingleAssignmentDisposable();
    disposable.Disposable = source.Subscribe(
        value =>
        {
            try { onNext(value); } catch (Exception ex) { onError(ex); disposable.Dispose(); }
        }, onError, () =>
        {
            try { onCompleted(); } catch (Exception ex) { onError(ex); }
        }
    );
    return disposable;
}

请注意,处理程序中未处理的异常onError仍会使进程崩溃!

¹只有在ThreadPool.

于 2020-11-30T07:52:41.157 回答
-1

你是对的 - 它应该感觉不好。像这样使用和返回主题不是一个好方法。

至少你应该像这样实现这个方法:

public static IObservable<T> ExceptionToError<T>(this IObservable<T> source)
{
    return Observable.Create<T>(o =>
    {
        var subscription = (IDisposable)null;
        subscription = source.Subscribe(x =>
        {
            try
            {
                o.OnNext(x);
            }
            catch (Exception ex)
            {
                o.OnError(ex);
                subscription.Dispose();
            }
        }, e => o.OnError(e), () => o.OnCompleted());
        return subscription;
    });
}

请注意,没有使用任何主题,并且如果我发现错误,然后我会处理订阅以防止序列继续超过错误。

但是,为什么不在OnError订阅中添加一个处理程序。有点像这样:

var xs = new Subject<int>();

xs.ObserveOn(Scheduler.ThreadPool).Subscribe(x =>
{
    Console.WriteLine(x);
    if (x % 5 == 0)
    {
        throw new System.Exception("Bang!");
    }
}, ex => Console.WriteLine(ex.Message));

xs.OnNext(1);
xs.OnNext(2);
xs.OnNext(3);
xs.OnNext(4);
xs.OnNext(5);

此代码在订阅中正确捕获错误。

另一种方法是使用Materialize扩展方法,但这可能有点矫枉过正,除非上述解决方案不起作用。

于 2012-06-25T01:11:45.553 回答