1

Reactive Extensions 应该评估其各种运算符多少次?

我有以下测试代码:

var seconds = Observable
    .Interval(TimeSpan.FromSeconds(5))
    .Do(_ => Console.WriteLine("{0} Generated Data", DateTime.Now.ToLongTimeString()));

var split = seconds
    .Do(_ => Console.WriteLine("{0}  Split/Branch received Data", DateTime.Now.ToLongTimeString()));

var merged = seconds
    .Merge(split)
    .Do(_ => Console.WriteLine("{0}   Received Merged data", DateTime.Now.ToLongTimeString()));

var pipeline = merged.Subscribe();

我希望这将每五秒写入一次“生成的数据”。然后,它将数据传递给写入“Split/Branch received Data”的“split”流和写入“Received Merged data”的“merged”流。最后,因为“合并”流也从“拆分”流接收,所以它第二次接收数据并第二次写入“Received Merged data”。(它写其中一些的顺序并不是特别相关)

但我得到的输出是这样的:

8:29:56 AM Generated Data
8:29:56 AM Generated Data
8:29:56 AM  Split/Branch received Data
8:29:56 AM   Received Merged data
8:29:56 AM   Received Merged data
8:30:01 AM Generated Data
8:30:01 AM Generated Data
8:30:01 AM  Split/Branch received Data
8:30:01 AM   Received Merged data
8:30:01 AM   Received Merged data

它正在写入两次“生成的数据”。据我了解,订阅“秒”IObservable 的下游观察者的数量不应影响“生成数据”写入的次数(应该是 ONCE),但确实如此。为什么?

注意我在 .Net Framework 3.5 环境中使用反应式扩展的稳定版本 v1.0 SP1。

4

4 回答 4

2

据推测,他们选择这种方法是为了让每个订阅者从他们的初始订阅开始以相同的时间间隔获取它的值。考虑您的备用间隔将如何工作:

0s - First subscriber subscribes
5s - Value: 0
8s - Second subscriber subscribes
10s - Value: 1
15s - Value: 2
17s - Unsubscribe both

你最终得到的是这样的:

First  -----0----1----2-|
Second         --1----2-|

在这种情况下,两个观察者的结果明显不同,具体取决于是否已经附加了任何其他观察者。在实施时,Interval无论订单或过去的订阅者如何,都为每个订阅者提供相同的体验。

综上所述,您可以通过在创建observable时Interval添加来“转换”为您描述的行为。.Publish().RefCount()seconds

于 2012-09-07T14:32:14.667 回答
1

尽管有时如果在每一步都多播序列似乎会很好,但如果那样的话,它就不允许你拥有 Rx 允许的丰富组合。

换个角度想, IObservable就是基于推送的对偶IEnumerableIEnumerable具有惰性求值的特性 - 在您开始通过Enumerator. Rx 序列是惰性组合的,最后一个 Subscribe()(与 For-Each 等效的 Observable)实现了该序列。

通过这种方式,您只需从最后一个阶段取消订阅,即可在所有阶段停止管道,让您无需经历管理单个订阅的噩梦即可拥有火灾和忘记行为。

于 2012-09-07T16:54:14.077 回答
0

在相关的说明中,这里有一个脑筋急转弯,说明了 Asti 与懒惰评估的可枚举序列的类比:

private static Random s_rand = new Random();

public static IEnumerable<int> Rand()
{
    while (true)
        yield return s_rand.Next();
}

public static void Main()
{
    var xs = Rand();

    var res = xs.Zip(xs, (l, r) => l == r).All(b => b);

    Console.WriteLine(res);
}

如果您使用自身压缩一个随机序列,您是否希望所有元素对都相同(即导致上面的代码永远运行)?或者,您是否希望代码因某种原因终止并打印错误?

(创建类似的可观察代码留给读者作为练习。)

于 2012-09-08T02:25:24.367 回答
0

从面向对象的角度来看,考虑基于定义Observables/的接口的流是正常的Enumerables。如果您可以忽略在 Enumerator 上定义了一个名为 Reset 的便捷方法 - 从功能上讲,Enumerables 是这样的f -> g -> value?。可枚举本质上是您调用以获取枚举器的函数,该函数本质上是您不断调用的函数,直到没有更多值要返回为止。

类似地,一个 Observable 被简单地定义为f(g) -> g(h) -> h(value?)- 它是一个你提供的函数,当有一个值时你想要调用的函数。

这就是为什么将可枚举或可观察描述为除了以某种方式定义的一组函数之外的任何东西是没有意义的,因此它们可以组合 - 合同是为了确保组合计算的能力。

无论它们是实时的、缓存的还是惰性的,都是可以在其他地方抽象出来的实现细节——虽然我当然不同意这些细节很重要,但更重要的是关注它的功能性质。

作为数据库查询或目录列表的序列具有与IEnumerable预先计算的一组值(如数组)相同的接口。这取决于最终使用序列来进行区分的代码。如果您能够习惯它是一种组合高阶函数的方法,您会发现使用 Rx 或 Ix 对问题进行建模会更容易。

于 2012-09-10T15:23:19.217 回答