11

我的情况是,我有一个正在处理的任务列表(启用驱动器、更改位置、等待停止、禁用)。

'wait for' 监视IObservable<Status>我想等待的一个 (所以我可以通过线程ContinueWith和其他任务)。

我开始在订阅者的 OnNext 处理中执行以下任务,但这很丑陋。我现在想出的是这种扩展方法:

public static Task<T> WaitFor<T>(this IObservable<T> source, Func<T, bool> pred)
{
    var tcs = new TaskCompletionSource<T>();
    source
        .Where(pred)
        .DistinctUntilChanged()
        .Take(1)  //OnCompletes the observable, subscription will self-dispose
        .Subscribe(val => tcs.TrySetResult(val),
                    ex => tcs.TrySetException(ex),
                    () => tcs.TrySetCanceled());

    return tcs.Task;
}

(更新了svick的处理建议OnCompletedOnError

问题:

  • 这是好事、坏事还是丑陋?
  • 我是否错过了可以做到这一点的现有扩展?
  • Where和的顺序是否DistinctUntilChanged正确?(我认为他们是)
4

2 回答 2

13

至少我会将此扩展方法更改为:

public static Task<T> WaitFor<T>(this IObservable<T> source, Func<T, bool> pred)
{
    return
        source
            .Where(pred)
            .DistinctUntilChanged()
            .Take(1)
            .ToTask();
}

使用.ToTask()比引入要好得多TaskCompletionSource。您需要对System.Reactive.Threading.Tasks命名空间的引用才能获取.ToTask()扩展方法。

此外,DistinctUntilChanged在此代码中是多余的。您只能获得一个值,因此默认情况下它必须是不同的。

现在,我的下一个建议可能有点争议。这个扩展是一个坏主意,因为它隐藏了正在发生的事情的真正语义。

如果我有这两个代码片段:

var t = xs.WaitFor(x => x > 10);

或者:

var t = xs.Where(x => x > 10).Take(1).ToTask();

我通常更喜欢第二个片段,因为它清楚地向我展示了正在发生的事情 - 我不需要记住WaitFor.

除非您将名称命名为WaitFor更具描述性——也许TakeOneAsTaskWhere——否则您将清楚地从使用它的代码中使用运算符,并使代码更难管理。

以下内容不是更容易记住语义吗?

var t = xs.TakeOneAsTaskWhere(x => x > 10);

对我来说,底线是 Rx 运算符是组合的,而不是封装的,但如果你要封装它们,那么它们的含义必须清楚。

我希望这有帮助。

于 2012-07-30T09:25:59.363 回答
3

对此不是 100% 确定,但通过阅读 Rx 2.0 beta 博客文章,我认为如果您可以使用 async/await,您可以“返回 await source.FirstAsync(pred)”或不使用异步,“return source.FirstAsync (pred).ToTask()"

http://blogs.msdn.com/b/rxteam/archive/2012/03/12/reactive-extensions-v2-0-beta-available-now.aspx

使用 rx 2.0 和 firstasync 拍摄 linqpad

于 2012-07-30T14:11:09.180 回答