2

我正在尝试扩展我对 Rx 的了解。所以我只是在玩流,并试图让它们表现得像我期望的那样。

虽然我已经阅读过,Repeat() 运算符在实践中存在困难,因为您可能会在 OnCompleted 和重新订阅之间丢失通知,但我自己无法弄清楚为什么会发生以下情况。

        var subject = new Subject<string>();

        var my = subject
            .Take(1)
            .Merge(Observable.Empty<string>().Delay(TimeSpan.FromMilliseconds(2000)))
            .Repeat();
        my.Subscribe(Console.WriteLine);

        var stopwatch = new Stopwatch();
        stopwatch.Start();
        Scheduler.ThreadPool.Schedule(TimeSpan.FromSeconds(1), () => subject.OnNext("1 at " + stopwatch.ElapsedMilliseconds));
        Scheduler.ThreadPool.Schedule(TimeSpan.FromSeconds(2), () => subject.OnNext("2 at " + stopwatch.ElapsedMilliseconds));
        Scheduler.ThreadPool.Schedule(TimeSpan.FromSeconds(3), () => subject.OnNext("3 at " + stopwatch.ElapsedMilliseconds));
        Scheduler.ThreadPool.Schedule(TimeSpan.FromSeconds(4), () => subject.OnNext("4 at " + stopwatch.ElapsedMilliseconds));
        Scheduler.ThreadPool.Schedule(TimeSpan.FromSeconds(5), () => subject.OnNext("5 at " + stopwatch.ElapsedMilliseconds));
        Scheduler.ThreadPool.Schedule(TimeSpan.FromSeconds(6), () => subject.OnNext("6 at " + stopwatch.ElapsedMilliseconds));

        Console.ReadLine();

当我运行这个例子时,结果是完全不确定的:

结果1:

1 at 1006
3 at 3007
5 at 4995

很好的是它忽略了 2 和 4,但即使在这个结果内部也有一些奇怪,因为实际上 3 和 5 之间并没有真正的 2 秒差距。

然而,结果可能更糟。看到这个:

1 at 1003
2 at 2003
4 at 4005
6 at 6004

1 和 2 之间没有 2 秒的间隔。正好是 1 秒。他为什么不把它放在一边?

如果有人能为我澄清事情,我会非常高兴!

编辑

我只是注意到这里可能是错误的合并。如果我将查询重构为 Concat 事情似乎应该发生:

        var my = subject
            .Take(1)
            .Concat(Observable.Empty<string>().Delay(TimeSpan.FromMilliseconds(2000)))
            .Repeat();
4

2 回答 2

1

Windows(和其他桌面操作系统)不是运行时操作系统,因此您不能依赖它的计时器精确到毫秒。特别是如果你有更多的计时器,这可能会导致不确定的行为,这正是你的情况。

这是您的原始序列的工作方式:

  • 时间 ~ 0
    • Take(1)订阅subject
    • 延迟的空 observable 的计时器开始计时
  • 时间 ~ 1
    • subject1,写出 1;在此之后,没有人订阅subject
  • 时间〜2
    • 延迟的空 observable 的计时器用完。因此,再次Take(1)订阅subject并启动另一个用于延迟空可观察对象的计时器
    • 大约在同一时间,2被添加到subject

由于时间上的细微差别,这两个动作在时间大约。2 可以按任何顺序发生。订单很重要,在Take()重新订阅之前或之前添加 2。因此,可以写出 2,也可以不写出。

如果你想要的是这样的序列:

  1. 等待第一个项目并返回它
  2. 等待大约两秒钟
  3. 等待第二个项目并返回它(忽略在两秒等待期间添加的任何内容)
  4. …</li>

那么我认为您编辑中的代码是正确的。

但这决不能保证确定性结果。在我的计算机上,如果我将延迟的空 observable 的等待时间更改为 1960 毫秒,我会在使用Concat().

于 2011-11-12T13:05:20.593 回答
0

只是为了把我的 2c 加到派对上;如果您正在寻找测试的确定性,那么您应该使用 TestScheduler 而不是真正的 ThreadPool/TaskPool/NewThread 调度程序。纯粹是因为 Svick 指出的原因(操作系统调度使船摇摆不定)。

于 2013-04-24T15:21:41.830 回答