11

给定一个IObservable<T>使用Throttle行为的方法(在添加项目时重置计时器,但它是否返回在该时间内添加的所有项目的集合?

Buffer提供了一个类似的功能,它IList<T>在每个时间跨度或计数上将数据分块。但是每次添加项目时,我都需要时间来重置。

我在这里看到了一个类似的问题,反应式扩展是否支持滚动缓冲区?,但答案似乎并不理想,而且它有点旧,所以我想知道 Rx-Main 的发布版本现在是否支持这个功能。

4

3 回答 3

13

正如我在另一篇文章中回答的那样,是的,你可以!使用ThrottleWindow方法Observable

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay)
{
    var closes = stream.Throttle(delay);
    return stream.Window(() => closes).SelectMany(window => window.ToList());
}
于 2012-03-20T17:28:55.250 回答
4

BufferUntilInactive我通过添加一个组件修改了上校 Panic 的运算符Publish,以便它也可以与冷的 observables 一起正常工作:

/// <summary>Projects each element of an observable sequence into consecutive
/// non-overlapping buffers, which are produced based on time and activity,
/// using the specified scheduler to run timers.</summary>
public static IObservable<IList<T>> BufferUntilInactive<T>(
    this IObservable<T> source, TimeSpan dueTime, IScheduler scheduler = default)
{
    scheduler ??= Scheduler.Default;
    return source.Publish(published =>
        published
            .Window(() => published.Throttle(dueTime, scheduler))
            .SelectMany(window => window.ToList())
    );
}

为了完整起见,我还添加了一个可选IScheduler参数,用于配置运行计时器的调度程序。

于 2020-12-27T09:26:51.677 回答
0

它不能与

Observable.BufferWithTimeOrCount<TSource> Method (IObservable<TSource>, TimeSpan, Int32)?

于 2012-01-13T12:19:12.147 回答