是否可以强制多个 RX 订阅到不同的 observables连续(不是同时)运行?
我知道我可以为此使用 EventLoopScheduler,但这会降低性能,因为所有处理都将在单个线程上完成。
是否可以强制多个 RX 订阅到不同的 observables连续(不是同时)运行?
我知道我可以为此使用 EventLoopScheduler,但这会降低性能,因为所有处理都将在单个线程上完成。
如果您打算运行一个 observable 直到OnCompleted
开始下一个,您应该查看Concat
. 如果您打算同时订阅多个不同的可观察对象,则可以使用Merge
(如果语义对您的场景有意义)。如果 Merge 不合适,我建议在观察者方法或您已经知道的 EventLoopScheduler 中使用标准线程同步方法之一(锁定、监视器等)。
编辑原始答案保留在下面
是的,可以强制串行观察者执行。但是,您是否需要取决于可观察的。一般来说,热的 observables 已经连续运行,而冷的 observables 不会。这是冷热可观察对象工作方式不同的副作用。要使冷的 observable 变热,从而使观察者连续运行,请使用Publish
. 这是一个演示各种行为的示例。
Sub Main()
'hot observable, runs serially
Dim trigger As New ObsEvent
Dim eobs = Observable.FromEventPattern(Of ItemEventArgs(Of String))(
Sub(h) AddHandler trigger.Triggered, h,
Sub(h) RemoveHandler trigger.Triggered, h)
Dim sub1 = eobs.Subscribe(Sub(v)
Console.WriteLine("Starting event observer 1: {0}", v.EventArgs.Item)
Thread.Sleep(2000)
Console.WriteLine("Ending event observer 1")
End Sub)
trigger.Trigger("event trigger 1")
Dim sub2 = eobs.Subscribe(Sub(v)
Console.WriteLine("Starting event observer 2: {0}", v.EventArgs.Item)
Thread.Sleep(2000)
Console.WriteLine("Ending event observer 2")
End Sub)
trigger.Trigger("event trigger 2")
Console.WriteLine()
Console.WriteLine()
'cold observable, runs "simultaneously"
Dim tobs = Observable.Timer(TimeSpan.FromSeconds(5))
sub1 = tobs.Subscribe(Sub(v)
Console.WriteLine("Starting timer observer 1")
Thread.Sleep(2000)
Console.WriteLine("Ending timer observer 1")
End Sub,
Sub(ex) Console.WriteLine("Error"),
Sub() Console.WriteLine("Observer 1 completed"))
Thread.Sleep(500)
sub2 = tobs.Subscribe(Sub(v)
Console.WriteLine("Starting timer observer 2")
Thread.Sleep(2000)
Console.WriteLine("Ending timer observer 2")
End Sub,
Sub(ex) Console.WriteLine("Error"),
Sub() Console.WriteLine("Observer 2 completed"))
'cold observable turned hot, runs serially
Dim pobs = tobs.Publish()
sub1 = pobs.Subscribe(Sub(v)
Console.WriteLine("Starting publish observer 1")
Thread.Sleep(2000)
Console.WriteLine("Ending publish observer 1")
End Sub,
Sub(ex) Console.WriteLine("Error"),
Sub() Console.WriteLine("Observer P1 completed"))
Thread.Sleep(500)
sub2 = pobs.Subscribe(Sub(v)
Console.WriteLine("Starting publish observer 2")
Thread.Sleep(2000)
Console.WriteLine("Ending publish observer 2")
End Sub,
Sub(ex) Console.WriteLine("Error"),
Sub() Console.WriteLine("Observer P2 completed"))
pobs.Connect()
Console.ReadKey()
End Sub