这是一个令人惊讶的难以解决的问题。更是如此,因为使用Zip
运算符将可观察对象与 , 对齐的诱人想法Observable.Interval
是错误的并且效率低下。与非对称可观察对象一起使用时,运算符的主要问题Zip
是它缓冲了最快生成的可观察对象的元素,从而可能导致在长期订阅期间分配大量内存。恕我直言,此运算符的使用应仅限于预期在长期内产生相等(或接近相等)数量的元素的成对可观察对象。
当发出值比源 observable 更快时, Zip
+Observable.Interval
组合的错误行为就会出现。Observable.Interval
在这种情况下,由 发出的多余值Observable.Interval
被缓冲,因此当源 observable 发出下一个元素时,已经有一个缓冲Interval
值形成一对,导致违反“元素之间的最小间隔”策略。
下面是一个自定义WithInterval
运算符的实现,它在可观察序列的连续元素之间施加最小间隔。然后,此运算符将用于解决此问题的特定问题,该问题涉及缓冲区而不是单个元素:
/// <summary>Intercepts a minimum interval between consecutive elements of an
/// observable sequence.</summary>
public static IObservable<T> WithInterval<T>(this IObservable<T> source,
TimeSpan interval, IScheduler scheduler = null)
{
return source
.Scan((Observable.Return(0L), (IObservable<T>)null), (state, x) =>
{
var (previousTimer, _) = state;
var timer = (scheduler != null ? Observable.Timer(interval, scheduler)
: Observable.Timer(interval)).PublishLast();
var delayed = previousTimer.Select(_ => x).Finally(() => timer.Connect());
return (timer, delayed);
})
.Select(e => e.Item2)
.Concat();
}
Observable.Timer
此实现在连续元素之间放置一个。棘手的部分是如何在正确的时刻激活每个计时器。它是通过Publish
设置计时器来实现的,并在每个计时器完成时让下一个计时器预热(Connect
)。
有了这个操作符,实现一个自定义BatchWithInterval
操作符就很简单了:
/// <summary>Projects each element of an observable sequence into consecutive
/// non-overlapping buffers which are produced based on element count information,
/// intercepting a minimum interval between consecutive buffers.</summary>
public static IObservable<IList<T>> BatchWithInterval<T>(this IObservable<T> source,
int count, TimeSpan interval, IScheduler scheduler = null)
{
return source.Buffer(count).WithInterval(interval, scheduler);
}
使用示例:
var subscription = eventAsObservable
.BatchWithInterval(50, TimeSpan.FromSeconds(10))
.Subscribe(DoProcessing);