是否可以强制多个 RX 订阅到不同的 observables连续(不是同时)运行?
我知道我可以为此使用 EventLoopScheduler,但这会降低性能,因为所有处理都将在单个线程上完成。
是否可以强制多个 RX 订阅到不同的 observables连续(不是同时)运行?
我知道我可以为此使用 EventLoopScheduler,但这会降低性能,因为所有处理都将在单个线程上完成。
如果您打算运行一个 observable 直到OnCompleted开始下一个,您应该查看Concat. 如果您打算同时订阅多个不同的可观察对象,则可以使用Merge(如果语义对您的场景有意义)。如果 Merge 不合适,我建议在观察者方法或您已经知道的 EventLoopScheduler 中使用标准线程同步方法之一(锁定、监视器等)。
编辑原始答案保留在下面
是的,可以强制串行观察者执行。但是,您是否需要取决于可观察的。一般来说,热的 observables 已经连续运行,而冷的 observables 不会。这是冷热可观察对象工作方式不同的副作用。要使冷的 observable 变热,从而使观察者连续运行,请使用Publish. 这是一个演示各种行为的示例。
Sub Main()
'hot observable, runs serially
Dim trigger As New ObsEvent
Dim eobs = Observable.FromEventPattern(Of ItemEventArgs(Of String))(
Sub(h) AddHandler trigger.Triggered, h,
Sub(h) RemoveHandler trigger.Triggered, h)
Dim sub1 = eobs.Subscribe(Sub(v)
Console.WriteLine("Starting event observer 1: {0}", v.EventArgs.Item)
Thread.Sleep(2000)
Console.WriteLine("Ending event observer 1")
End Sub)
trigger.Trigger("event trigger 1")
Dim sub2 = eobs.Subscribe(Sub(v)
Console.WriteLine("Starting event observer 2: {0}", v.EventArgs.Item)
Thread.Sleep(2000)
Console.WriteLine("Ending event observer 2")
End Sub)
trigger.Trigger("event trigger 2")
Console.WriteLine()
Console.WriteLine()
'cold observable, runs "simultaneously"
Dim tobs = Observable.Timer(TimeSpan.FromSeconds(5))
sub1 = tobs.Subscribe(Sub(v)
Console.WriteLine("Starting timer observer 1")
Thread.Sleep(2000)
Console.WriteLine("Ending timer observer 1")
End Sub,
Sub(ex) Console.WriteLine("Error"),
Sub() Console.WriteLine("Observer 1 completed"))
Thread.Sleep(500)
sub2 = tobs.Subscribe(Sub(v)
Console.WriteLine("Starting timer observer 2")
Thread.Sleep(2000)
Console.WriteLine("Ending timer observer 2")
End Sub,
Sub(ex) Console.WriteLine("Error"),
Sub() Console.WriteLine("Observer 2 completed"))
'cold observable turned hot, runs serially
Dim pobs = tobs.Publish()
sub1 = pobs.Subscribe(Sub(v)
Console.WriteLine("Starting publish observer 1")
Thread.Sleep(2000)
Console.WriteLine("Ending publish observer 1")
End Sub,
Sub(ex) Console.WriteLine("Error"),
Sub() Console.WriteLine("Observer P1 completed"))
Thread.Sleep(500)
sub2 = pobs.Subscribe(Sub(v)
Console.WriteLine("Starting publish observer 2")
Thread.Sleep(2000)
Console.WriteLine("Ending publish observer 2")
End Sub,
Sub(ex) Console.WriteLine("Error"),
Sub() Console.WriteLine("Observer P2 completed"))
pobs.Connect()
Console.ReadKey()
End Sub