为了精简 Rx,我正在编写一个模拟读取文件并支持取消的简单应用程序。
它是这样工作的:
当用户按下“运行”按钮时,程序
- 显示忙碌指示器,
- 分块读取文件,这是一个漫长的过程,报告每个文件的长度,
- 当读取过程完成(或取消,见下文)时隐藏忙碌指示符。
如果用户按下“取消”按钮,文件读取过程将被取消。
- 如果用户在前一个操作仍在进行时按下“运行”按钮,则前一个操作将被取消并开始新的操作。
我正在寻找一种用最少的胶水代码来实现这一点的方法,同时仍然保持简单的同步操作序列的外观,其中很容易看到自然控制流。
主要问题似乎是当内部可观察对象(阅读器)完成时隐藏忙碌指示器,外部可观察对象是“运行”点击事件序列。
到目前为止,我想出了程序主要部分的这两个版本:
基于订阅者:
IObservable<byte[]> xs =
runClick
.Do((Action<EventPattern<EventArgs>>)(_ => ShowBusy(true)))
.Do(_ => ShowError(false))
.Select(_ =>
reader
.TakeUntil(cancelClick)
.Publish(ob =>
{
ob.Subscribe(
b => { },
ex =>
{
Console.WriteLine("ERROR: " + ex);
ShowBusy(false);
ShowError(true);
},
() => ShowBusy(false));
return ob;
}))
.Switch();
xs = xs.Catch<byte[], Exception>(e => xs);
和基于 concat:
IObservable<byte[]> xs =
runClick
.Do((Action<EventPattern<EventArgs>>)(_ => ShowBusy(true)))
.Do(_ => ShowError(false))
.Select(_ =>
reader
.TakeUntil(cancelClick)
.DoAfter(() =>
ShowBusy(false)))
.Switch();
xs = xs.Catch<byte[], Exception>(e =>
{
Console.WriteLine("ERROR: " + e);
ShowBusy(false);
ShowError(true);
return xs;
});
使用一种自定义方法DoAfter
public static IObservable<T> DoAfter<T>(this IObservable<T> observable, Action action)
{
IObservable<T> appended = observable.Concat(
Observable.Create<T>(o =>
{
try
{
action();
}
catch (Exception ex)
{
o.OnError(ex);
return Disposable.Empty;
}
o.OnCompleted();
return Disposable.Empty;
}));
return appended;
}
(在此处查看整个应用程序代码)。
两种实现都不完全让我满意:第一个相当冗长,第二个迫使我为一个相当简单的任务编写自己的方法,我希望通过标准方法来完成。我错过了什么吗?