public static IObservable<T> TimeOutExtension<T>(
this IObservable<T> source,
TimeSpan timeSpan)
{
// On Timeout complete with an empty Observable.
var completeOnTimeout = source
.Timeout(timeSpan)
.Catch<T, TimeoutException>(ex => Observable.Empty<T>());
// Join the source w/ the empty Observable created on timeout.
var beforeTimeout =
source.Join(completeOnTimeout,
_ => source,
_ => completeOnTimeout,
(s, c) => s);
// Return last
return beforeTimeout.LastAsync();
}
可以这样使用:
// Create 10 events quickly, then once every two seconds.
var source =
Observable.Interval(TimeSpan.FromMilliseconds(100))
.Take(10)
.Concat(Observable.Interval(TimeSpan.FromSeconds(2)));
// Set a timeout of 1 second.
var last = source.TimeOutExtension(TimeSpan.FromSeconds(1));
last.Subscribe(Console.WriteLine); // outputs 9