我的情况是,我有一个正在处理的任务列表(启用驱动器、更改位置、等待停止、禁用)。
'wait for' 监视IObservable<Status>
我想等待的一个 (所以我可以通过线程ContinueWith
和其他任务)。
我开始在订阅者的 OnNext 处理中执行以下任务,但这很丑陋。我现在想出的是这种扩展方法:
public static Task<T> WaitFor<T>(this IObservable<T> source, Func<T, bool> pred)
{
var tcs = new TaskCompletionSource<T>();
source
.Where(pred)
.DistinctUntilChanged()
.Take(1) //OnCompletes the observable, subscription will self-dispose
.Subscribe(val => tcs.TrySetResult(val),
ex => tcs.TrySetException(ex),
() => tcs.TrySetCanceled());
return tcs.Task;
}
(更新了svick的处理建议OnCompleted
和OnError
)
问题:
- 这是好事、坏事还是丑陋?
- 我是否错过了可以做到这一点的现有扩展?
Where
和的顺序是否DistinctUntilChanged
正确?(我认为他们是)