2

我有以下代码

 static void Main(string[] args)
        {
            //var source = BlockingMethod();
            var source2 = NonBlocking();
            source2.Subscribe(Console.WriteLine);
            //source.Subscribe(Console.WriteLine);
            Console.ReadLine();

        }
            private static IObservable<string> BlockingMethod()
            {
              var subject = new ReplaySubject<string>();
              subject.OnNext("a");
              subject.OnNext("b");
              subject.OnCompleted();
              Thread.Sleep(1000);
              return subject;
            }
            private static IObservable<string> NonBlocking()
            {
                return Observable.Create<string>(
                    observable =>
                        {
                            observable.OnNext("c");
                            observable.OnNext("d");
                            observable.OnCompleted();
                            //Thread.Sleep(1000);

                            var source = BlockingMethod();
                            source.Subscribe(Console.WriteLine);

                            return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed"));
                            //or can return an Action like
                            //return () => Console.WriteLine("Observer has unsubscribed");
                        });
            }
        }

哪个打印

c
d
Observer has unsubscribed
a
b

谁能帮我了解程序中的控制流程。我确实尝试阅读调用堆栈等。但无法理解所有内容。

编辑 为什么我得到上面的输出(我认为是正确的)而不是

 c 
 d 
 a 
 b 
 Observer has unsubscribed
4

1 回答 1

2

您的预期行为和实际行为的差异来自以下行:

var subject = new ReplaySubject<string>();

默认情况下 aReplaySubject使用Scheduler.CurrentThread. 就好像你这样声明它:

var subject = new ReplaySubject<string>(Scheduler.CurrentThread);

使用当前线程进行调度时,您的操作会排队 - 等待当前执行的代码在它开始之前完成。如果您希望代码立即运行,您需要Scheduler.Immediate像这样使用:

var subject = new ReplaySubject<string>(Scheduler.Immediate);

这足以解释它吗?

于 2012-01-22T00:20:47.273 回答