2

我对使用 Observable.Publish 进行多播处理的生命周期有点困惑。应该如何正确使用连接?与直觉相反,我发现我不需要为多播观察者调用 connect 来开始他们的订阅。

var multicast = source.Publish();
var field0 = multicast.Select(record => record.field0);
var field1 = multicast.Select(record => record.field1);

// Do I need t*emphasized text*o call here?
var disposable = multicast.connect()

// Does calling 
disposable.Dispose();
// unsubscribe field0 and field1?

编辑

我的困惑是为什么我没有在 IConnectableObservable 上显式调用 Connect 时成功订阅。但是我在 IConnectableObservable 上调用 Await 隐式调用 Connect

Public Async Function MonitorMeasurements() As Task


    Dim cts = New CancellationTokenSource

    Try
        Using dialog = New TaskDialog(Of Unit)(cts)

            Dim measurementPoints = 
                MeasurementPointObserver(timeout:=TimeSpan.FromSeconds(2)).
                TakeUntil(dialog.CancelObserved).Publish()

            Dim viewModel = New MeasurementViewModel(measurementPoints)
            dialog.Content = New MeasurementControl(viewModel)
            dialog.Show()

            Await measurementPoints
        End Using
    Catch ex As TimeoutException
        MessageBox.Show(ex.Message)
    Catch ex As Exception
        MessageBox.Show(ex.Message)
    End Try

End Function

请注意,当按下取消按钮时,我的 TaskDialog 公开了一个名为 CancelObserved 的可观察对象。

解决方案

解决方案由@asti 在链接中发布。这是该链接中 RX 团队的引述

请注意,使用 await 会导致订阅发生,从而使可观察序列变热。此版本中包含对 IConnectableObservable 的 await 支持,这会导致将序列连接到其源并订阅它。如果没有 Connect 调用,等待操作将永远无法完成

4

2 回答 2

6

Publish在源上返回一个IConnectableObservable<T>本质上是IObservable<T>带有Connect方法的。您可以使用ConnectIDisposable返回来控制对源的订阅。

Rx 被设计成一个“一劳永逸”的系统。在您明确处置它们或它们完成/错误之前,订阅不会被终止。

disp0 = field0.Subscribe(...); disp1 = field1.Subscribe(...) - 订阅在明确处理之前不会终止disp0, disp1- 这与与多播源的连接无关。

您可以在不干扰下方管道的情况下连接和断开连接。不用担心手动管理连接的一种更简单的方法是使用.Publish().RefCount()which 将保持连接,只要至少有一个观察者仍然订阅它。这被称为预热 observable。


已更新问题中的编辑

OP 正在调用await.IConnectableObservable<T>

来自Rx 的发行说明:

..await 的使用通过导致订阅发生而使可观察序列变热。此版本中包含对 IConnectableObservable 的 await 支持,这会导致将序列连接到其源并订阅它。如果没有 Connect 调用,等待操作将永远无法完成。

示例(取自同一页)

static async  void Foo()
{
    var xs = Observable.Defer(() =>
    {
        Console.WriteLine("Operation started!");
        return Observable.Interval(TimeSpan.FromSeconds(1)).Take(10);
    });

    var ys = xs.Publish();

    // This doesn't trigger a connection with the source yet.
    ys.Subscribe(x => Console.WriteLine("Value = " + x));

    // During the asynchronous sleep, nothing will be printed.
    await Task.Delay(5000);

    // Awaiting causes the connection to be made. Values will be printed now,
    // and the code below will return 9 after 10 seconds.
    var y =  await ys;
    Console.WriteLine("Await result = " + y);
}
于 2012-10-22T08:48:25.277 回答
4

发布允许您共享订阅。这显然对于使 Cold 可观察序列 Hot 最有用。即采用导致某些订阅副作用(可能是到网络的连接)发生的序列,并确保副作用执行一次并且在消费者之间共享序列的结果。

在实践中,您在冷序列上调用发布,订阅您的消费者,然后在订阅之后连接已发布的序列以减轻任何竞争条件。

所以基本上,你在上面做了什么。

对于已经很热门的序列,例如 Subjects、FromEventPattern 或已经发布和连接的东西,这在很大程度上是没有意义的。

处理 Connect() 方法中的值将“断开”序列,从而阻止消费者获得更多值。如果消费者订阅中的任何一个想要提前分离,您也可以处理这些订阅。

说了这么多,你似乎在做正确的事情。你看到的问题是什么?我假设您正在连接到一个已经很热的序列。

于 2012-10-22T08:36:10.487 回答