8

我有一个IObservable以随机间隔产生值的,我想限制这个序列。我发现的一件事是Throttle运营商对“节流”的定义与我的不同。

Throttle仅在指定的时间间隔过后才产生值它产生最后看到的值)。我认为节流意味着在指定的时间间隔内产生值(当然,除非有沉默)。

说,我希望Observable.Interval(100).Select((_,i) => i).Throttle(200)产生(以任何性能/时序问题为模)偶数,因为我将其限制为“半速”。然而,这个序列根本没有产生任何价值,因为从来没有一段长度为 200 的静默期。

所以,我发现这Sample实际上是我想要的“节流”行为。Observable.Interval(100).Select((_,i) => i).Sample(200)产生(再次以任何性能/时序问题为模)偶数序列。

但是,我还有另一个问题:间隔会有所不同,具体取决于最后一个“采样”值。我想要的是编写一个如下所示的运算符:

public static IObservable<T> Sample<T>(this IObservable<T> source, Func<T, TimeSpan> intervalSelector);

intervalSelector参数产生下一个样本的间隔,第一个样本......要么取自第一个值,要么取自附加参数,我不在乎。

我试着写这个,但我最终得到了一个大而复杂的结构,它不能很好地工作。我的问题是,我可以使用现有的运营商(也就是单线)来构建它吗?

4

3 回答 3

6

几个小时后,在上面睡了一会儿,我明白了。

public static IObservable<T> Sample<T>(this IObservable<T> source, Func<T, TimeSpan> intervalSelector)
{
    return source.TimeInterval()
                 .Scan(Tuple.Create(TimeSpan.Zero, false, default(T)), (acc, v) =>
                 {
                     if(v.Interval >= acc.Item1)
                     {
                         return Tuple.Create(intervalSelector(v.Value), true, v.Value);
                     }
                     return Tuple.Create(acc.Item1 - v.Interval, false, v.Value);
                 })
                 .Where(t => t.Item2)
                 .Select(x => x.Item3);
}

这可以按我的意愿工作:每次它产生一个 valuex时,它​​都会停止产生值,直到intervalSelector(x)时间过去。

于 2010-08-16T23:54:12.807 回答
0

不是您要寻找的 Observable.BufferWithTime吗?

于 2010-08-16T05:20:33.447 回答
0

我以另一种方式做到了,我想与任何有用的人分享。

var random = new Random();
Observable.Return(Unit.Default)
            .SelectMany(_ => Observable.Timer(TimeSpan.FromSeconds(random.Next(1, 6))))
            .TimeInterval()
            .Do(value => Console.WriteLine(value))
            .Repeat()
            .Subscribe();
于 2020-02-25T01:42:32.967 回答