我对使用 Observable.Publish 进行多播处理的生命周期有点困惑。应该如何正确使用连接?与直觉相反,我发现我不需要为多播观察者调用 connect 来开始他们的订阅。
var multicast = source.Publish();
var field0 = multicast.Select(record => record.field0);
var field1 = multicast.Select(record => record.field1);
// Do I need t*emphasized text*o call here?
var disposable = multicast.connect()
// Does calling
disposable.Dispose();
// unsubscribe field0 and field1?
编辑
我的困惑是为什么我没有在 IConnectableObservable 上显式调用 Connect 时成功订阅。但是我在 IConnectableObservable 上调用 Await 隐式调用 Connect
Public Async Function MonitorMeasurements() As Task
Dim cts = New CancellationTokenSource
Try
Using dialog = New TaskDialog(Of Unit)(cts)
Dim measurementPoints =
MeasurementPointObserver(timeout:=TimeSpan.FromSeconds(2)).
TakeUntil(dialog.CancelObserved).Publish()
Dim viewModel = New MeasurementViewModel(measurementPoints)
dialog.Content = New MeasurementControl(viewModel)
dialog.Show()
Await measurementPoints
End Using
Catch ex As TimeoutException
MessageBox.Show(ex.Message)
Catch ex As Exception
MessageBox.Show(ex.Message)
End Try
End Function
请注意,当按下取消按钮时,我的 TaskDialog 公开了一个名为 CancelObserved 的可观察对象。
解决方案
解决方案由@asti 在链接中发布。这是该链接中 RX 团队的引述
请注意,使用 await 会导致订阅发生,从而使可观察序列变热。此版本中包含对 IConnectableObservable 的 await 支持,这会导致将序列连接到其源并订阅它。如果没有 Connect 调用,等待操作将永远无法完成