我一直很高兴通过以下方式使用 RX 在 WPF 应用程序中进行一些 API 调用:
IDisposable disposable = _textFromEventPatternStream
.ObserveOn(_rxConcurrencyService.Dispatcher)
.Subscribe(async input =>
{
try
{
IsLoading = true;
int x = int.Parse(input);
var y = await _mathApi.CalcAsync(x);
IsLoading = false;
Model.Update("", y);
}
catch (Exception ex)
{
Model.Update(ex.Message, "Error caught in subscribe, stream continues...");
}
finally
{
IsLoading = false;
}
},
ex => Model.Update(ex.Message, "Error, stream will end..."));
但是由于各种原因,我认为我可能需要使用 SelectMany 运算符进行调用并对流进行一些处理。
我希望在 api 调用中可能会有一些错误。例如,API 端点可能不可用。API 调用之前的某些解析失败。等等。我希望 Hot Observable 继续。我还需要显示一个标准的 IsLoading 微调器。
现在我也明白了,一旦收到 OnError 序列就不应该继续。我明白这一点......我只是不喜欢它。
有了这个,问题是:使用 Retry() 是实现热可观察的正确方法吗?不管错误如何,它都可以继续运行?
下面重写的代码有效,但感觉很糟糕:
IDisposable disposable = _textFromEventPatternStream
.Select(input => int.Parse(input)) // simulating much heavier pre processing, leading to a possible error
.ObserveOn(_rxConcurrencyService.Dispatcher)
.Do(_ => IsLoading = true)
.ObserveOn(_rxConcurrencyService.TaskPool)
.SelectMany(inputInt => _mathApi.CalcAsync(inputInt))
.ObserveOn(_rxConcurrencyService.Dispatcher)
.Do(s => { },
ex =>
{
// this feels like a hack.
Model.Update(ex.Message, "Error, stream will retry...");
IsLoading = false;
})
.Retry()
.Subscribe(x => Model.Update("", x),
ex => Model.Update(ex.Message, "Error, stream will end..."));
我看过一些代码示例,人们使用嵌套流重新订阅故障流。从我读过的内容来看,这似乎是一种常见的方法,但对我来说,这似乎将原本应该是简单的场景变成了难以理解的情况。