2

我正在使用最新的Reactive Extensions并遇到了设计问题:

如果我从传递给订阅的委托中抛出异常,会发生什么?

通过源步进,我发现:

  • 受试者将忽略异常。
  • 从 Producer 派生的运算符(例如Where)在异常通过它们时处理订阅。

因此,当然,我一直发现,在任何通过标准 RX 运算符传递可观察对象的地方,任何异常都会导致我的事件由于处理而停在那里。至少,除非我重新订阅。

这让我质疑我的设计。从我的代表那里抛出异常是一个坏主意吗?显然 RX 团队是这么认为的。(虽然我会质疑默默地处理“坏”订阅是否是正确的方法。)

不过,看看我的设计,我不明白为什么会出现问题。我有一些受保护的操作正在进行,我开始触发一些 OnNext 来通知侦听器(我们已经从旧的 skool .NET 事件完全切换到 Observables),如果那里出现任何问题,它将抛出堆栈直到它遇到处理程序. 在我的例子中,处理程序回滚它正在处理的事务,这也会通知侦听器回滚。这一切都是异常安全的,工作正常。至少,如果 Dispose 在 Where 运算符的 Producer 基础中进行,它可以正常工作。

更进一步......主题和同行不这样做是否不一致?对于我们自己在此处编写的 ISubject 和 observable 运算符,我们是否应该执行相同的异常处理行为?

我期待着任何见解!

4

2 回答 2

5

有问题时抛出异常。当你认为你可以处理他们指出的问题时,抓住他们。异常处理的语言支持假设您有一个函数调用堆栈,因此异常只是爬上堆栈寻找处理程序。

但请记住,在使用 Rx 时,处理模型已被侧身。您不再有一个深度函数调用堆栈,顶部是源(调用代码),底部是观察者(调用代码)。所以你不能依赖语言在你的 Rx 流中做正确的事情。

如果你的回调抛出一个异常,那么 Rx 倾向于捕获该异常并将其通过OnErrorobservable 的通道向下传递。如果您在订阅时没有提供OnError处理程序,那么 Rx 往往会在后台线程上引发异常,从而在您的应用程序中生成未处理的异常。

Rx 不会通知数据源下游发生了异常。这是因为数据源与数据消费者完全隔离。它可能不在同一进程上,甚至不在同一台机器上,也可能不在同一语言中。这就是为什么Subject不做任何事情的原因。在这种Subject情况下,它充当数据源,并不真正关心观察者做什么。

正如您所注意到的,未捕获的异常将导致 Rx 默认取消订阅观察者的观察者。这是快速失败的理念,也是 Rx 可以做出的唯一安全假设。如果你的观察者提出了一个异常,那么一定是有问题,我们不应该默认给它更多的数据。Rx 提供了一些机制,允许您以不一定取消订阅 observable 的方式处理异常。

一种方法是将您的异常转换为数据:

// instead of:
source.Where(foo => predicateThatMightThrowException(foo)).Subscribe(foo => ..., error => ...)

// do:
source.Select(foo =>
{
    try { return new { foo: foo, filter: predicateThatMightThrowException(foo), error: (Exception)null }; }
    catch (Exception e) { return { foo: foo, filter: true, error: e } };
})
.Where(f => f.filter)
.Subscribe(f =>
{
    if (f.error != null) { handle error }
    else { handle f.foo }
});

其他方式涉及使用CatchExceptionor Retry。Rxx 库有一些与左/右通道配对的 observables 和一种Either类型,您可以使用它来在“左侧”流式传输您的良好数据和在“右侧”流式传输您的错误。

于 2013-10-11T15:51:35.670 回答
2

好的,我在源代码中找到了答案。将简单地将其粘贴在这里以供后代使用。

// Safeguarding of the pipeline against rogue observers is required for proper
// resource cleanup. Consider the following example:
//
//   var xs  = Observable.Interval(TimeSpan.FromSeconds(1));
//   var ys  = <some random sequence>;
//   var res = xs.CombineLatest(ys, (x, y) => x + y);
//
// The marble diagram of the query above looks as follows:
//
//   xs  -----0-----1-----2-----3-----4-----5-----6-----7-----8-----9---...
//                  |     |     |     |     |     |     |     |     |
//   ys  --------4--+--5--+-----+--2--+--1--+-----+-----+--0--+-----+---...
//               |  |  |  |     |  |  |  |  |     |     |  |  |     |
//               v  v  v  v     v  v  v  v  v     v     v  v  v     v
//   res --------4--5--6--7-----8--5--6--5--6-----7-----8--7--8-----9---...
//                                 |
//                                @#&
//
// Notice the free-threaded nature of Rx, where messages on the resulting sequence
// are produced by either of the two input sequences to CombineLatest.
//
// Now assume an exception happens in the OnNext callback for the observer of res,
// at the indicated point marked with @#& above. The callback runs in the context
// of ys, so the exception will take down the scheduler thread of ys. This by
// itself is a problem (that can be mitigated by a Catch operator on IScheduler),
// but notice how the timer that produces xs is kept alive.

所以答案是:是的,在 OnNext 中抛出异常是个坏主意。一般来说。在我的具体情况下,我知道这没关系,所以我会找到另一种方法。

于 2013-10-11T03:16:26.943 回答