您必须使用任务吗?
如果您乐于纯粹使用 Observables,那么您可以自己很好地完成这项工作。
尝试做这样的事情:
var query =
Observable.Create<int>(o =>
{
var cancelling = false;
var cancel = Disposable.Create(() =>
{
cancelling = true;
});
var subscription = Observable.Start(() =>
{
for (var i = 0; i < 100; i++)
{
Thread.Sleep(10); //1000 ms in total
if (cancelling)
{
Console.WriteLine("Cancelled on {0}", i);
return -1;
}
}
Console.WriteLine("Done");
return 42;
}).Subscribe(o);
return new CompositeDisposable(cancel, subscription);
});
这个 observable 在 for 循环中使用 做一些艰苦的工作Thread.Sleep(10);
,但是当 observable 被处理时,循环退出并且密集的 CPU 工作停止。然后你可以使用标准的 RxDispose
来Switch
取消正在进行的工作。
如果您希望将其捆绑在一个方法中,请尝试以下操作:
public static IObservable<T> Start<T>(Func<Func<bool>, T> work)
{
return Observable.Create<T>(o =>
{
var cancelling = false;
var cancel = Disposable
.Create(() => cancelling = true);
var subscription = Observable
.Start(() => work(() => cancelling))
.Subscribe(o);
return new CompositeDisposable(cancel, subscription);
});
}
然后用这样的函数调用它:
Func<Func<bool>, int> work = cancelling =>
{
for (var i = 0; i < 100; i++)
{
Thread.Sleep(10); //1000 ms in total
if (cancelling())
{
Console.WriteLine("Cancelled on {0}", i);
return -1;
}
}
Console.WriteLine("Done");
return 42;
};
这是我证明这有效的代码:
var disposable =
ObservableEx
.Start(work)
.Subscribe(x => Console.WriteLine(x));
Thread.Sleep(500);
disposable.Dispose();
我的输出是“Cancelled on 50”(有时是“Cancelled on 51”)。