0

迈出 Rx 的第一步,我被困在这里:

public class DisposableResourceDemo : IDisposable
{
    public DisposableResourceDemo() {
        Console.WriteLine("DisposableResourceDemo constructor.");
    }

    public void Dispose() {
        Console.WriteLine("DisposableResourceDemo.Dispose()");
    }

    public void SideEffect() {
        Console.WriteLine("DisposableResourceDemo.SideEffect()");
    }
}

[Test]
public void ShowBehaviourOfRxUsing()
{
    var test = Observable.Using(() =>
        {
             // This should happen exactly once, independent of number of subscriptions,
             // object should be disposed on last subscription disposal or OnCompleted call 
                return new DisposableResourceDemo();
        },
        (dr) =>
        {
            return Observable.Create<string>(
                (IObserver<string> observer) =>
                {
                    dr.SideEffect();
                    var dummySource = Observable.Return<string>("Some Text");

                    return dummySource.Subscribe(observer);
                });
        }).Publish().RefCount();


    Console.WriteLine("before 1st subscription.");
    test.Subscribe(Console.WriteLine, () => Console.WriteLine("OnCompleted in 1st."));
    Console.WriteLine("before 2nd subscription.");
    test.Subscribe(Console.WriteLine, () => Console.WriteLine("OnCompleted in 2nd."));
}

令我惊讶的是,上面的代码产生了

before 1st subscription.
DisposableResourceDemo constructor.
DisposableResourceDemo.SideEffect()
Some Text
OnCompleted in 1st.
DisposableResourceDemo.Dispose()
before 2nd subscription.
--> [happy with missing "Some Text" here]
OnCompleted in 2nd.
--> [unhappy with second instantiation here] 
DisposableResourceDemo constructor.
DisposableResourceDemo.SideEffect()
DisposableResourceDemo.Dispose()

请注意,在两个订阅之后手动调用 Connect() 不是我想要的,尽管输出符合预期。

4

1 回答 1

0

我不完全确定您要在这里实现什么。您似乎想共享可观察序列及其相关资源。因此,执行此操作的标准方法是使用从 .Replay() 和 .Publish() 等获得的 ConnectableObservable 类型

您说您不想使用 .Connect() 而是使用非常常见的 .RefCount() 。但是,您的序列完成。您还使用了扩展方法 Subscribe(...),它将在内部创建一个自动分离观察者,即当序列完成时,它将断开连接。

所以我的问题是,内部序列真的应该完成吗?如果答案是肯定的,那么为什么第二次订阅会收到 OnComplete 通知……它已经发生了,它已经过去了。也许您确实想重播 OnComplete,在这种情况下,也许 .Replay(1) 就是您想要的。如果答案是否定的,那么您可以通过在 .Publish() 之前或 Observable.Return 之后放置 Concat(Observable.Never<string>()) 来轻松解决此问题。

于 2012-07-25T10:04:42.440 回答