0

有没有一个例子向我展示了它的Observable.Count<TSource> Method实际工作原理?我提出的示例似乎返回了一个包含在 observable 中的计数,而不是预期的计数。

例如,我希望1从这里返回:

System.Diagnostics.Debug.WriteLine((Observable.Return<string>("Hello world!")).Count());

以后会1返回(因为毕竟是异步序列)?还是我错过了一些基本的东西?在撰写本文时,我实际上假设只要结果被推出,它.Count()就会返回结果并随着时间的推移而增长。T真的吗?是的。

4

2 回答 2

5

Rx 中的聚合运算符的工作方式与 LINQ 中的稍有不同——它们不会立即返回值,而是返回未来的结果(即,我们只能在 Observable 完成后知道最终的 Count 是多少)。

所以如果你写:

Observable.Return("foo").Count().Subscribe(x => Console.WriteLine(x));
>>> 1

因为毕竟它是一个异步序列

这实际上并不完全正确。在这里,只要有人调用,一切都会立即运行Subscribe。上面这段代码没有任何异步,没有额外的线程,一切都发生在订阅上。

于 2013-11-04T23:08:01.557 回答
4

我认为使用立即返回的 observable以及使用 rasx 在评论中所做的 async/await 语法会使事情过于混乱。

让我们创建一个包含 5 个元素的流,这些元素每秒返回一个然后完成:

private IObservable<long> StreamWith5Elements()
{
    return Observable.Interval(TimeSpan.FromSeconds(1))
                     .Take(5);
}

我们可以使用 async/await 魔法来调用它,就像在这个LINQPad友好示例中一样:

void Main()
{
    CountExampleAsync().Wait();
}

private async Task CountExampleAsync()
{
    int result = await StreamWith5Elements().Count();
    Console.WriteLine(result);
}

但它会误导这里发生的事情 -Count()返回 a IObservable<int>,但 Rx 对结果流非常友好,await并将结果流转换为 a Task<int>- 然后 await 然后交回该任务的int结果。

当您对 a 使用 await 时IObservable<T>,您隐含地说您希望该 observableOnNext()使用单个结果调用然后调用OnComplete(). 实际发生的是,您将获得一个Task<T>返回在流终止之前发送的最后一个值。(类似于AsyncSubject<T>行为方式)。

这很有用,因为这意味着任何流都可以映射到 a Task,但确实需要仔细考虑。

现在,上面的例子等价于下面更传统的 Rx:

void Main()
{
    PlainRxCountExample();
}

private void PlainRxCountExample()
{
    IObservable<int> countResult = StreamWith5Elements().Count();
    countResult.Subscribe(count => Console.WriteLine(count));

    /* block until completed for the sake of the example */
    countResult.Wait();
}

在这里您可以看到它Count()确实返回了一个 int 流 - 以提供异步计数。它只会在源流完成时返回。

在 Rx 的早期,Count() 实际上是同步的。

然而,这并不是一个非常有用的事态,因为它“退出 Monad”——即让你脱离IObservable<T>并阻止你进一步与 Rx 运算符组合。

一旦你开始“在流中思考”,Count() 的异步特性确实非常直观,因为当然你只能在流完成时提供流的计数 - 为什么要为此徘徊?:)

于 2013-11-04T23:35:15.997 回答