1

我有一个发送事件的可观察流:

---*--*--*---*------------------------*---*---***--**-----

我想将此流包装到另一个流中,如果在最后 N 秒内没有事件,则每 N 秒添加一次“请保持”事件:

期望的结果:(X 是新注入的事件

---*--*--*---*---X---X---X---X---X---X*---*---***--**---X-

我正在尝试以惯用的反应方式对其进行建模,但还没有找到可以让我轻松做到这一点的运算符。超时是似乎即将结束的那个,除了它不重复,它实际上终止了流。

4

2 回答 2

0

如果以后有人遇到这个问题,这就是我最终得到的解决方案:

 public static class ObservableExtensions
 {
    public static IObservable<T> PulseDuringInactivity<T>(this IObservable<T> source, TimeSpan pulseInterval, IScheduler scheduler = null)
    {
        return source
            //for each incoming event, we create a new observable stream, that contains this event being repeated infinitely many times over at our pulse interval
            //these repeating values will serve as the pulse
            .Select(sourceValue => RepeatingObservable(sourceValue, pulseInterval, scheduler))

            //However obviously that creates way too many events.  What we we'll do is throw away all of these repeating observables, as soon as there's a single event coming in on any of our streams.
            //this is what .Switch() does, and it immediately unsubscribes from the now-irrelevant streams, which can then be gc'd.  For more information about the workings of this technique,
            //check out the tests in ObservableExtensionTests.cs
            .Switch();
    }

    static IObservable<T> RepeatingObservable<T>(T valueToRepeat, TimeSpan repeatInterval, IScheduler scheduler)
    {
        return Observable.Interval(repeatInterval, scheduler ?? Scheduler.Default).Select(_ => valueToRepeat);
    }
}
于 2016-12-08T09:35:10.917 回答
0

This is slightly "hacked" solution. It is not clear what the code is doing.

public static IObservable<T> RepeatingTimeout<T>(ITestableObservable<T> events, TimeSpan timespan, T timeoutEvent, TestScheduler scheduler)
{
    var groups = events
        .GroupByUntil(x => 1, g => g, g => g.Throttle(timespan, scheduler)) // group events into observables so that each observable contains sequence of events where each event is sooner than timespan
        .Select(x => 
            x.Concat(Observable.Interval(timespan, scheduler).Select(_ => timeoutEvent) // concatenate timeout events at intervals after each group of events
            .StartWith(timeoutEvent))
            )
        .Switch(); // return events from latest observable
    return groups;
}

And this is test that shows that it works:

[Test]
public void RepeatedTimeout()
{
    var scheduler = new TestScheduler();

    var events = scheduler.CreateHotObservable(
        OnNext(4, 1),
        OnNext(7, 2),
        OnNext(10, 3),
        OnNext(14, 4),
        OnNext(30, 5),
        OnNext(35, 6),
        OnCompleted<int>(36)
        );

    var timespan = TimeSpan.FromTicks(5);
    var timeoutEvent = -1;

    var groups = RepeatingTimeout(events, timespan, timeoutEvent, scheduler);

    groups.Do(x => Console.WriteLine("G " + scheduler.Clock + " " + x));


    var results = scheduler.Start(() => groups, 0, 0, 100);
}
于 2016-11-25T08:12:11.477 回答