0

我有两个热门的 observables 我不想错过任何通知

第一个可观察的产生数字

1-2-3-4

和第二弦

抗体

我正在寻找一种方法来压缩它们以产生下一个输出

a-1 b-2 a-3 b-4 a-5

所以我想要的是无限期地重复第二个流,直到第一个流被取消订阅。我尝试了类似下一个测试的方法,它产生了所需的输出。但是我Repeat永远不会,因为我不知道如何停下来测试结果。此外,我使用了ReplaySubject手动完成的,这意味着我无法收到任何新通知。

    class MyClass:ReactiveTest{
        [Fact]
        public void MethodName4() {
            var strings = new ReplaySubject<string>();
            var testScheduler = new TestScheduler();
            testScheduler.Schedule("", TimeSpan.FromTicks(10), (scheduler, s) => {
                strings.OnNext("a");
                strings.OnNext("b");
                strings.OnCompleted();
            });

            var numbers = testScheduler.CreateHotObservable(OnNext(100, 1), OnNext(200, 2), OnNext(300, 3), OnNext(400, 4), OnNext(500, 5));
            numbers.Zip(Observable.Defer(() => strings).Repeat(3), (i, s) => (i, s)).Subscribe();
            testScheduler.AdvanceBy(500);
        }

    }
4

2 回答 2

1

我想我找到了一个我将尝试描述的解决方案。

重复一个 hot observable 是没有意义的,因为它永远不会完成。但是,您可以重复这个 hot observable 的窗口。我从@Enigmativity 重构了这个答案,以重复所有值而不是最后一个值。

    public static IObservable<T> RepeatAllDuringSilence<T>(this IObservable<T> source, TimeSpan maxQuietPeriod,
        IScheduler scheduler=null, IObservable<T> inner=null) {
        if (scheduler==null)
            scheduler=Scheduler.Default;
        if (inner == null)
            inner = source;
        return Observable.Create<T>(observer => {
            var replay = inner.Replay(scheduler);
            var sequence = source.Select(x => Observable.Interval(maxQuietPeriod, scheduler).SelectMany(l => replay).StartWith(x)).Switch();
            return new CompositeDisposable(replay.Connect(), sequence.Subscribe(observer));
        });
    }

然后在我的测试中使用这种方法,我设法在没有完成ReplaySubject或丢失未来通知的情况下获得预期的结果,因为我改变了我原来可观察的温度。

class MyClass : ReactiveTest {
    [Fact]
    public void MethodName4() {
        var strings = new Subject<string>();
        var testScheduler = new TestScheduler();
        testScheduler.Schedule("", TimeSpan.FromTicks(10), (scheduler, s) => {
            strings.OnNext("a");
            strings.OnNext("b");
        });
        testScheduler.Schedule("", TimeSpan.FromTicks(20), (scheduler, s) => {
            strings.OnNext("c");
        });

        var numbers = testScheduler.CreateHotObservable(OnNext(10, 1), OnNext(20, 2), OnNext(30, 3), OnNext(40, 4), OnNext(50, 5));
        numbers.Zip(strings.RepeatAllDuringSilence(TimeSpan.FromTicks(10),testScheduler), (i, s) => (i, s)).Subscribe(tuple => WriteLine(tuple));
        testScheduler.AdvanceBy(50);
    }

}

(1, a)
(2, b)
(3, c)
(4, a)
(5, b)

于 2018-01-29T09:29:12.927 回答
0

这不能解决你的问题吗?:

  void TestMergedPairs()
    {
        var source1 = Observable.Range(1,9999);
        var source2 = Observable.Range(65, 2).Repeat();


        var z = source1.Zip(source2,(x,y)=> { return new { left = x, right = y }; })
             .TakeWhile((v)=>v.left<1000);


        z.Subscribe(x => { Console.WriteLine( x.left +" "+x.right ); });
    }

如果不是,那么不清楚你的问题是什么。如果您希望能够不断重复第二个可观察的历史内容,从某种意义上说,您的“a”、“b”可能稍后会变成“a”、“b”、“c”,那么这个 3 字母序列应该进行迭代,那么您的源可能是聚合历史窗口的 .Scan 的结果。

于 2018-01-29T10:31:41.333 回答