我有一堆事件进来,我必须毫无损失地执行所有这些事件,但我想确保它们在适当的时间段被缓冲和消耗。有人有解决方案吗?
我在 Rx 中找不到任何可以在不丢失事件的情况下做到这一点的操作员(Throttle - 丢失事件)。我也考虑过缓冲、延迟等...找不到好的解决方案。
我试图在中间放一个计时器,但不知何故它根本不起作用:
GetInitSequence()
.IntervalThrottle(TimeSpan.FromSeconds(5))
.Subscribe(
item =>
{
Console.WriteLine(DateTime.Now);
// Process item
}
);
public static IObservable<T> IntervalThrottle<T>(this IObservable<T> source, TimeSpan dueTime)
{
return Observable.Create<T>(o =>
{
return source.Subscribe(x =>
{
new Timer(state =>
o.OnNext((T)state), x, dueTime, TimeSpan.FromMilliseconds(-1));
}, o.OnError, o.OnCompleted);
});
}