2

我一直很高兴通过以下方式使用 RX 在 WPF 应用程序中进行一些 API 调用:

    IDisposable disposable = _textFromEventPatternStream
        .ObserveOn(_rxConcurrencyService.Dispatcher)
        .Subscribe(async input =>
            {
                try
                {
                    IsLoading = true;
                    int x = int.Parse(input);
                    var y = await _mathApi.CalcAsync(x);
                    IsLoading = false;
                    Model.Update("", y);
                }
                catch (Exception ex)
                {
                    Model.Update(ex.Message, "Error caught in subscribe, stream continues...");
                }
                finally
                {
                    IsLoading = false;
                }
            },
            ex => Model.Update(ex.Message, "Error, stream will end..."));

但是由于各种原因,我认为我可能需要使用 SelectMany 运算符进行调用并对流进行一些处理。

我希望在 api 调用中可能会有一些错误。例如,API 端点可能不可用。API 调用之前的某些解析失败。等等。我希望 Hot Observable 继续。我还需要显示一个标准的 IsLoading 微调器。

现在我也明白了,一旦收到 OnError 序列就不应该继续。我明白这一点......我只是不喜欢它。

有了这个,问题是:使用 Retry() 是实现热可观察的正确方法吗?不管错误如何,它都可以继续运行?

下面重写的代码有效,但感觉很糟糕:

    IDisposable disposable = _textFromEventPatternStream
        .Select(input => int.Parse(input)) // simulating much heavier pre processing, leading to a possible error
        .ObserveOn(_rxConcurrencyService.Dispatcher)
        .Do(_ => IsLoading = true)
        .ObserveOn(_rxConcurrencyService.TaskPool)
        .SelectMany(inputInt => _mathApi.CalcAsync(inputInt))
        .ObserveOn(_rxConcurrencyService.Dispatcher)
        .Do(s => { },
            ex =>
            {
                // this feels like a hack.
                Model.Update(ex.Message, "Error, stream will retry...");
                IsLoading = false;
            })
        .Retry()
        .Subscribe(x => Model.Update("", x),
            ex => Model.Update(ex.Message, "Error, stream will end..."));

我看过一些代码示例,人们使用嵌套流重新订阅故障流。从我读过的内容来看,这似乎是一种常见的方法,但对我来说,这似乎将原本应该是简单的场景变成了难以理解的情况。

4

2 回答 2

2

如果它CalcAsync可能引发错误,我会尝试这个:

.SelectMany(inputInt => Observable.FromAsync(() => _mathApi.CalcAsync(inputInt)).Retry())

将重试尽可能靠近有故障的可观察对象。

我还建议某种重试计数,这样一个永久错误不会只是挂起 observable。

这是一个示例,表明这是有效的。

这失败了:

void Main()
{
    var subject = new Subject<string>();

    IDisposable disposable =
        subject
            .Select(input => int.Parse(input))
            .SelectMany(inputInt => Observable.FromAsync(() => CalcAsync(inputInt)))
            .Subscribe(x => Console.WriteLine(x));

    subject.OnNext("1");
    subject.OnNext("2");
    subject.OnNext("3");
    subject.OnNext("4");
    subject.OnNext("5");
    subject.OnNext("6");
    subject.OnCompleted();
}

private int _counter = 0;

public async Task<int> CalcAsync(int x)
{
    if (_counter++ == 3)
    {
        throw new Exception();
    }
    return await Task.Factory.StartNew(() => -x);
}

它通常输出:

-1
-2
-3
引发了“System.Exception”类型的异常。

更改SelectMany为:

.SelectMany(inputInt => Observable.FromAsync(() => CalcAsync(inputInt)).Retry())

现在我得到:

-1
-3
-2
-4
-5
-6
于 2017-10-16T11:58:01.270 回答
0

跟进,这是我们最终使用的,以防其他人有类似的情况。

我们删除了 Retry() 运算符,因为它似乎最适合 Cold Observables。

因此,考虑到最初的意图,我们希望流继续进行而不考虑错误(我们期望网络连接问题等),在我们记录错误之后,我们递归地调用在异常处理程序中进行订阅的方法,以便日志拿起异常。

它似乎运行良好,并且已经通过了我们的各种场景,包括 db 服务器关闭、api 超时异常等......

如果方法中有任何明显的缺陷,请随时发表评论......

 public void ObserveSelectedTemplateStream(IObservable<dto> textFromEventPatternStream)
            {

                _compositeDisposable.Dispose(); // clean whatever subscriptions we have
                _compositeDisposable = new CompositeDisposable();

                var disposable = textFromEventPatternStream
                .Select(x => Parse(x)) // simulating much heavier pre processing, leading to a possible error
                .ObserveOn(_rxConcurrencyService.Dispatcher)
                .Do(_ => IsLoading = true)
                .ObserveOn(_rxConcurrencyService.TaskPool)
                .SelectMany(x=> _mathApi.CalcAsync(x))
                .ObserveOn(_rxConcurrencyService.Dispatcher)                    .Subscribe(Model.Update,
                    ex => {
                        HandleException(ex);
                        ObserveSelectedTemplateStream(textFromEventPatternStream); // Recursively resubscribe to our stream. We expect errors. It's an API.
                    });

            _compositeDisposable.Add(disposable);


        }
于 2017-11-09T10:38:18.977 回答