2

是否可以强制多个 RX 订阅到不同的 observables连续(不是同时)运行?

我知道我可以为此使用 EventLoopScheduler,但这会降低性能,因为所有处理都将在单个线程上完成。

4

1 回答 1

1

如果您打算运行一个 observable 直到OnCompleted开始下一个,您应该查看Concat. 如果您打算同时订阅多个不同的可观察对象,则可以使用Merge(如果语义对您的场景有意义)。如果 Merge 不合适,我建议在观察者方法或您已经知道的 EventLoopScheduler 中使用标准线程同步方法之一(锁定、监视器等)。

编辑原始答案保留在下面

是的,可以强制串行观察者执行。但是,您是否需要取决于可观察的。一般来说,热的 observables 已经连续运行,而冷的 observables 不会。这是冷热可观察对象工作方式不同的副作用。要使冷的 observable 变热,从而使观察者连续运行,请使用Publish. 这是一个演示各种行为的示例。

Sub Main()
    'hot observable, runs serially
    Dim trigger As New ObsEvent
    Dim eobs = Observable.FromEventPattern(Of ItemEventArgs(Of String))(
                    Sub(h) AddHandler trigger.Triggered, h,
                    Sub(h) RemoveHandler trigger.Triggered, h)
    Dim sub1 = eobs.Subscribe(Sub(v)
                                  Console.WriteLine("Starting event observer 1: {0}", v.EventArgs.Item)
                                  Thread.Sleep(2000)
                                  Console.WriteLine("Ending event observer 1")
                              End Sub)
    trigger.Trigger("event trigger 1")
    Dim sub2 = eobs.Subscribe(Sub(v)
                                  Console.WriteLine("Starting event observer 2: {0}", v.EventArgs.Item)
                                  Thread.Sleep(2000)
                                  Console.WriteLine("Ending event observer 2")
                              End Sub)
    trigger.Trigger("event trigger 2")

    Console.WriteLine()
    Console.WriteLine()

    'cold observable, runs "simultaneously"
    Dim tobs = Observable.Timer(TimeSpan.FromSeconds(5))
    sub1 = tobs.Subscribe(Sub(v)
                              Console.WriteLine("Starting timer observer 1")
                              Thread.Sleep(2000)
                              Console.WriteLine("Ending timer observer 1")
                          End Sub,
                          Sub(ex) Console.WriteLine("Error"),
                          Sub() Console.WriteLine("Observer 1 completed"))
    Thread.Sleep(500)
    sub2 = tobs.Subscribe(Sub(v)
                              Console.WriteLine("Starting timer observer 2")
                              Thread.Sleep(2000)
                              Console.WriteLine("Ending timer observer 2")
                          End Sub,
                          Sub(ex) Console.WriteLine("Error"),
                          Sub() Console.WriteLine("Observer 2 completed"))

    'cold observable turned hot, runs serially
    Dim pobs = tobs.Publish()
    sub1 = pobs.Subscribe(Sub(v)
                              Console.WriteLine("Starting publish observer 1")
                              Thread.Sleep(2000)
                              Console.WriteLine("Ending publish observer 1")
                          End Sub,
                          Sub(ex) Console.WriteLine("Error"),
                          Sub() Console.WriteLine("Observer P1 completed"))
    Thread.Sleep(500)
    sub2 = pobs.Subscribe(Sub(v)
                              Console.WriteLine("Starting publish observer 2")
                              Thread.Sleep(2000)
                              Console.WriteLine("Ending publish observer 2")
                          End Sub,
                          Sub(ex) Console.WriteLine("Error"),
                          Sub() Console.WriteLine("Observer P2 completed"))
    pobs.Connect()

    Console.ReadKey()
End Sub
于 2012-01-01T14:28:17.510 回答