我有以下设置
IObservable<Data> source = ...;
source
.Select(data=>VeryExpensiveOperation(data))
.Subscribe(data=>Console.WriteLine(data));
通常,这些事件会在合理的时间范围内分开。想象一下用户更新表单中的文本框。我们VeryExpensiveOperation
可能需要 5 秒钟才能完成,同时屏幕上会显示一个沙漏。
但是,如果在 5 秒内用户再次更新文本框,我想VeryExpensiveOperation
在新的开始之前向当前发送取消。
我会想象这样的场景
source
.SelectWithCancel((data, cancelToken)=>VeryExpensiveOperation(data, token))
.Subscribe(data=>Console.WriteLine(data));
因此,每次调用 lambda 时都会使用可用于管理取消的 cancelToken 来调用Task
. 但是现在我们混合了 Task、CancelationToken 和 RX。不太确定如何将它们组合在一起。有什么建议么。
弄清楚如何使用 XUnit 测试操作员的奖励积分:)
第一次尝试
public static IObservable<U> SelectWithCancelation<T, U>( this IObservable<T> This, Func<CancellationToken, T, Task<U>> fn )
{
CancellationTokenSource tokenSource = new CancellationTokenSource();
return This
.ObserveOn(Scheduler.Default)
.Select(v=>{
tokenSource.Cancel();
tokenSource=new CancellationTokenSource();
return new {tokenSource.Token, v};
})
.SelectMany(o=>Observable.FromAsync(()=>fn(o.Token, o.v)));
}
尚未测试。我希望一个未完成的任务会生成一个 IObservable,它在不触发任何OnNext
事件的情况下完成。