2

我在理解 Observable.Delay 的工作原理以及何时调用 Dispose() 时遇到问题。熟悉 Rx 的人可以帮忙吗?

以下代码片段:

    static void Main(string[] args)
    {
        var oneNumberEveryFiveSeconds = new SomeObservable();
        // Instant echo
        oneNumberEveryFiveSeconds.SubscribeOn(Scheduler.ThreadPool).Subscribe(num => Console.WriteLine(num));
        // One second delay
        oneNumberEveryFiveSeconds.Delay(TimeSpan.FromSeconds(1)).SubscribeOn(Scheduler.ThreadPool).Subscribe(num => Console.WriteLine("...{0}...", num));
        // Two second delay
        oneNumberEveryFiveSeconds.Delay(TimeSpan.FromSeconds(2)).SubscribeOn(Scheduler.ThreadPool).Subscribe(num => Console.WriteLine("......{0}......", num));

        Console.ReadKey();
    }

    public class SomeObservable : IObservable<int>
    {
        public IDisposable Subscribe(IObserver<int> o)
        {
            for (var i = 0; i < 2; i++)
            {
                o.OnNext(i);
            }
            o.OnCompleted();

            return new DisposableAction(() => { Console.WriteLine("DISPOSED"); });
        }
    }

    public class DisposableAction : IDisposable
    {
        public DisposableAction(Action dispose)
        {
            this.dispose = dispose;
        }

        readonly Action dispose;

        public void Dispose()
        {
            dispose();
        }
    }

产生以下结果:

0
1
已处置
已处置 已
处置
...0...
...1...
......0......
......1......

我期待它更像:

0
1
弃置
...0...
...1...
弃置
......0......
......1......
弃置

任何想法??

4

3 回答 3

2

The standard functionality of Rx is to dispose a subscription when the sequence completes (even if its values are still being piped through another sequence).

With that in mind, Delay cannot control the speed of values being emitted from the source sequence, it can only delay the values to its own observers.

于 2010-10-25T12:16:38.413 回答
0

如果我不得不猜测,我会说延迟将来自原始 observable 的项目排队,然后根据指定的延迟在它认为合适的时候分派它们。因此,即使原始的 observable 早已被处理掉,由 Delay 方法创建的 observable 仍然是活跃的。您观察到的行为非常符合这个解释。

于 2010-10-20T20:55:12.593 回答
-1

没有 ThreadPool 的行为是相同的:

0 1 已处理 已处理 ...0... ...1... ......0...... ......1......

于 2010-10-20T02:52:26.860 回答