This is slightly "hacked" solution. It is not clear what the code is doing.
public static IObservable<T> RepeatingTimeout<T>(ITestableObservable<T> events, TimeSpan timespan, T timeoutEvent, TestScheduler scheduler)
{
var groups = events
.GroupByUntil(x => 1, g => g, g => g.Throttle(timespan, scheduler)) // group events into observables so that each observable contains sequence of events where each event is sooner than timespan
.Select(x =>
x.Concat(Observable.Interval(timespan, scheduler).Select(_ => timeoutEvent) // concatenate timeout events at intervals after each group of events
.StartWith(timeoutEvent))
)
.Switch(); // return events from latest observable
return groups;
}
And this is test that shows that it works:
[Test]
public void RepeatedTimeout()
{
var scheduler = new TestScheduler();
var events = scheduler.CreateHotObservable(
OnNext(4, 1),
OnNext(7, 2),
OnNext(10, 3),
OnNext(14, 4),
OnNext(30, 5),
OnNext(35, 6),
OnCompleted<int>(36)
);
var timespan = TimeSpan.FromTicks(5);
var timeoutEvent = -1;
var groups = RepeatingTimeout(events, timespan, timeoutEvent, scheduler);
groups.Do(x => Console.WriteLine("G " + scheduler.Clock + " " + x));
var results = scheduler.Start(() => groups, 0, 0, 100);
}