给定一个IObservable<T>
使用Throttle
行为的方法(在添加项目时重置计时器,但它是否返回在该时间内添加的所有项目的集合?
Buffer
提供了一个类似的功能,它IList<T>
在每个时间跨度或计数上将数据分块。但是每次添加项目时,我都需要时间来重置。
我在这里看到了一个类似的问题,反应式扩展是否支持滚动缓冲区?,但答案似乎并不理想,而且它有点旧,所以我想知道 Rx-Main 的发布版本现在是否支持这个功能。
给定一个IObservable<T>
使用Throttle
行为的方法(在添加项目时重置计时器,但它是否返回在该时间内添加的所有项目的集合?
Buffer
提供了一个类似的功能,它IList<T>
在每个时间跨度或计数上将数据分块。但是每次添加项目时,我都需要时间来重置。
我在这里看到了一个类似的问题,反应式扩展是否支持滚动缓冲区?,但答案似乎并不理想,而且它有点旧,所以我想知道 Rx-Main 的发布版本现在是否支持这个功能。
正如我在另一篇文章中回答的那样,是的,你可以!使用Throttle
和Window
方法Observable
:
public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay)
{
var closes = stream.Throttle(delay);
return stream.Window(() => closes).SelectMany(window => window.ToList());
}
BufferUntilInactive
我通过添加一个组件修改了上校 Panic 的运算符Publish
,以便它也可以与冷的 observables 一起正常工作:
/// <summary>Projects each element of an observable sequence into consecutive
/// non-overlapping buffers, which are produced based on time and activity,
/// using the specified scheduler to run timers.</summary>
public static IObservable<IList<T>> BufferUntilInactive<T>(
this IObservable<T> source, TimeSpan dueTime, IScheduler scheduler = default)
{
scheduler ??= Scheduler.Default;
return source.Publish(published =>
published
.Window(() => published.Throttle(dueTime, scheduler))
.SelectMany(window => window.ToList())
);
}
为了完整起见,我还添加了一个可选IScheduler
参数,用于配置运行计时器的调度程序。