13

我只想学习两者以及如何一起使用它们。我知道它们可以相互补充,我只是找不到有人实际这样做的例子。

4

1 回答 1

21

让我从一些背景开始。

.NET 框架有许多特殊类型——适当的类或接口—— 、、、、、等等Task<T>——它们为基础类型提供了特殊的能力。IObservable<T>Nullable<T>IEnumerable<T>Lazy<T>T

TPL 用于Task<T>表示 的单个值的异步计算T

Rx 用于IObservable<T>表示 的零个或多个值的异步计算T

正是这两者的“异步计算”方面将 TPL 和 Rx 结合在一起。

现在,TPL 也使用该类型Task来表示Actionlambda 的异步执行,但这可以被认为是Task<T>where Tis的一种特殊情况void。非常像 c# 中的标准方法,返回void如下:

public void MyMethod() { }

Rx 还允许使用称为 的特殊类型来处理相同的特殊情况Unit

TPL 和 Rx 之间的区别在于返回值的数量。TPL 是一且只有一,而 Rx 是零或更多。

因此,如果您通过仅使用返回单个值的可观察序列以特殊方式处理 Rx,您可以以与 TPL 类似的方式进行一些计算。

例如,在 TPL 中我可以写:

Task.Factory
    .StartNew(() => "Hello")
    .ContinueWith(t => Console.WriteLine(t.Result));

在 Rx 中,等价于:

Observable
    .Start(() => "Hello")
    .Subscribe(x => Console.WriteLine(x));

通过指定应该使用 TPL 执行计算,我可以在 Rx 中更进一步,如下所示:

Observable
    .Start(() => "Hello", Scheduler.TaskPool)
    .Subscribe(x => Console.WriteLine(x));

(默认情况下使用线程池。)

现在我可以做一些“混合和匹配”。如果我添加对System.Reactive.Threading.Tasks命名空间的引用,我可以很容易地在任务和可观察对象之间移动。

Task.Factory
    .StartNew(() => "Hello")
    .ToObservable()
    .Subscribe(x => Console.WriteLine(x));

Observable
    .Start(() => "Hello")
    .ToTask()
    .ContinueWith(t => Console.WriteLine(t.Result));

注意ToObservable()&.ToTask()调用以及从一个库到另一个库的翻转。

如果我有一个返回多个值的 observable,我可以使用 observable.ToArray()扩展方法将多个序列值转换为可以转换为任务的单个数组值。像这样:

Observable
    .Interval(TimeSpan.FromSeconds(1.0))
    .Take(5) // is IObservable<long>
    .ToArray()
    .ToTask() // is Task<long[]>
    .ContinueWith(t => Console.WriteLine(t.Result.Length));

我认为这是对您问题的一个相当基本的答案。这是你所期待的吗?

于 2012-07-10T12:56:23.000 回答