0

谁能告诉我是什么导致以下代码中出现这种看似奇怪的 Do 行为?我希望每个 OnNext 调用一次 Do 处理程序。

using System;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using NUnit.Framework;

[TestFixture]
public class WhyDoesDoActSoWierd
{
    [Test]
    public void ButWhy()
    {
        var doCount = 0;
        var observable = new Subject<int>();
        var stream = observable.Do( x => doCount++ );
        var subs =
            (from x in stream
             where x % 2 == 1
             from y in stream
             where y % 2 == 0
                   && y == x + 1
             select new { x, y })
                .Subscribe( x => Console.WriteLine( "{0}, {1}", x.x, x.y ) );

        observable.OnNext( 1 );
        // doCount == 1
        observable.OnNext( 2 );
        // doCount == 3
        observable.OnNext( 3 );
        // doCount == 5
        observable.OnNext( 4 );
        // doCount == 8
        observable.OnNext( 5 );
        // doCount == 11
        observable.OnNext( 6 );
        // doCount == 15
        Assert.AreEqual( 6, doCount );
    }
}
4

2 回答 2

3

这里的行为是完全正常的。原因是您不仅有一个订阅,还有很多订阅。并且由于查询中两个可观察对象之间的笛卡尔积,您拥有的数量Do比您预期的要多。

让我们看一下您的替代(但类似)查询。

    var doCountX = 0;
    var doCountY = 0;

    Action dump = () =>
        Console.WriteLine("doCountX = {0}, doCountY = {1}", doCountX, doCountY);

    var observable = new Subject<int>();

    var streamX = observable.Do(x => doCountX++);
    var streamY = observable.Do(x => doCountY++);

    var query =
        from x in streamX
        from y in streamY
        select new { x, y };

    query.Subscribe(z => Console.WriteLine("{0}, {1}", z.x, z.y));

    dump();
    for (var i = 1; i <= 6; i++)
    {
        observable.OnNext(i);
        dump();
    }

输出是:

doCountX = 0, doCountY = 0
doCountX = 1, doCountY = 0
1, 2
doCountX = 2, doCountY = 1
1, 3
2, 3
doCountX = 3, doCountY = 3
1, 4
2, 4
3, 4
doCountX = 4, doCountY = 6
1, 5
2, 5
3, 5
4, 5
doCountX = 5, doCountY = 10
1, 6
2, 6
3, 6
4, 6
5, 6
doCountX = 6, doCountY = 15

doCountX = 0, doCountY = 0由于此调用发生dump()在对OnNext.

但是当我们第一次调用时,OnNext我们没有得到查询产生的值,因为第二个streamYobservable 还没有被订阅。

只有当OnNext第二次被调用时,我们才会从查询中得到一个值,该值恰好是第一个OnNext值与第二个值配对。现在这也创建了一个新的订阅streamY,等待下一个值。

所以我们现在已经从streamX等待序列中的下一个值中得到了前两个值。所以当OnNext(3)被调用时,我们得到两个结果。

每次发生这种情况时,您都可以看到Do呼叫数量doCountY不断增加。

事实上,给定这个非常简单SelectMany的查询,公式是:

doCountY = n * (n - 1) / 2

因此,通过OnNext您产生的 6 个值doCountY等于6 * 5 / 215

运行 10 个值给出10 * 9 / 245值。

因此SelectMany,事实上 a 的订阅量比您想象的要多得多。这通常是为什么您通常只使用它来将每个只产生一个值的可观察对象链接在一起,以防止订阅呈指数增长。

现在有意义吗?

于 2012-10-26T10:50:28.987 回答
1

作为 Enigmativity 充实的答案的旁白,这里发生了两件事:

  1. 像 Enumerables 这样的 Observable 是懒惰地组成的。就像您不希望在遍历枚举器之前评估可枚举查询中的任何组合器一样,您应该期望消费者将看到为该管道评估的所有组合器,其次数与订阅管道的观察者数量一样多. 简而言之,x <- stream, y <- stream已经成功了两次。

  2. 理解改写为:

        stream1.Where(x => x % 2 == 1)
               .SelectMany(x => stream2
                                .Where(y => y % 2 == 0 && y == x + 1)
                                .Select(y => new { x, y })
                                );
    

    对于收到的每个 x 值,您将订阅流中与谓词匹配的所有值 - 这变得很多。查询理解通常被取消糖化为SelectMany// Join-GroupBy您实际上最终会使用的大部分 Rx 可能会更好地用其他运算符表示 - 例如MergeorZip甚至Join.

有一个问题与您刚刚提出的问题非常相似: Reactive Extensions 是否评估了太多次?

关于为什么这是 Rx 中的预期行为,有一些讨论。

于 2012-10-26T12:22:45.577 回答