我认为使用立即返回的 observable以及使用 rasx 在评论中所做的 async/await 语法会使事情过于混乱。
让我们创建一个包含 5 个元素的流,这些元素每秒返回一个然后完成:
private IObservable<long> StreamWith5Elements()
{
return Observable.Interval(TimeSpan.FromSeconds(1))
.Take(5);
}
我们可以使用 async/await 魔法来调用它,就像在这个LINQPad友好示例中一样:
void Main()
{
CountExampleAsync().Wait();
}
private async Task CountExampleAsync()
{
int result = await StreamWith5Elements().Count();
Console.WriteLine(result);
}
但它会误导这里发生的事情 -Count()
返回 a IObservable<int>
,但 Rx 对结果流非常友好,await
并将结果流转换为 a Task<int>
- 然后 await 然后交回该任务的int
结果。
当您对 a 使用 await 时IObservable<T>
,您隐含地说您希望该 observableOnNext()
使用单个结果调用然后调用OnComplete()
. 实际发生的是,您将获得一个Task<T>
返回在流终止之前发送的最后一个值。(类似于AsyncSubject<T>
行为方式)。
这很有用,因为这意味着任何流都可以映射到 a Task
,但确实需要仔细考虑。
现在,上面的例子等价于下面更传统的 Rx:
void Main()
{
PlainRxCountExample();
}
private void PlainRxCountExample()
{
IObservable<int> countResult = StreamWith5Elements().Count();
countResult.Subscribe(count => Console.WriteLine(count));
/* block until completed for the sake of the example */
countResult.Wait();
}
在这里您可以看到它Count()
确实返回了一个 int 流 - 以提供异步计数。它只会在源流完成时返回。
在 Rx 的早期,Count() 实际上是同步的。
然而,这并不是一个非常有用的事态,因为它“退出 Monad”——即让你脱离IObservable<T>
并阻止你进一步与 Rx 运算符组合。
一旦你开始“在流中思考”,Count() 的异步特性确实非常直观,因为当然你只能在流完成时提供流的计数 - 为什么要为此徘徊?:)