-2

一旦另一个订阅完成,我就有一个开始订阅的场景。两者都共享相同的连接流。

IConnectableObservable<List<int>> stream = GetMyHotStream();
var firstSubscription = stream.Subscribe(s=>{});
var secondSubscription = stream.Subscribe(s=> {});
stream.Connect();

现在我想等到secondSubscription完成firstSubscription

为简洁起见,我省略了其他细节。

任何答案都会非常有帮助..谢谢

4

3 回答 3

0

这是一个示例,每次对热可观察对象进行更新时,第一个订阅都会打印出“Sub 1 item”。我使用 Take(5) 来确保第一次订阅完成。之后,第二次订阅将继续从原始来源获取项目。

var s = Observable.Interval(TimeSpan.FromSeconds(1)).Publish();

var sub1 = s.Take(5).Dump("Sub 1 item");
var sub2 = sub1.IgnoreElements().Concat(s).Dump("Sub 2 item");

s.Connect();

输出:

Sub 1 item →0
Sub 1 item →1
Sub 1 item →2
Sub 1 item →3
Sub 1 item →4
Sub 2 item →5
Sub 2 item →6
Sub 2 item →7

这是你所追求的吗?问候,

霍华德

于 2013-05-31T14:00:25.580 回答
0

据我了解,您正在尝试保证两个订阅的执行顺序。问题是,在设置时,它们是独立的。我认为最好的解决方案就是将它们放在一个操作中。

stream.Subscribe(s =>
{
   FirstSubscriptionStuff(s);
   SecondSubscriptionStuff(s):
});
于 2013-05-31T07:33:55.600 回答
-1

我写了类似的东西

public static IObservable<TSource> WaitUntil<TSource>(this IObservable<TSource> source, IObservable<Unit> completionSource)
    {
        return new WaitUntil<TSource>(source, completionSource);
    }

并通过在completionSource的OnCompletion中订阅源来实现WaitUntil。

这个的用法是

var asyncSubject = new AsyncSubject();

        var secondStream = stream.WaitUntil(asyncSubject).Subscribe(s =>
            {
            });

        var firstStream = stream.SubscribeOn(Scheduler.NewThread).Subscribe(s =>
            { },
            () => 
            {
                asyncSubject.OnCompleted(); 
             });

        stream.Connect();

不确定这有多优雅,因为我刚开始在我的项目中使用 Rx,但这很好用。

于 2013-05-31T16:24:03.600 回答