27

我正在使用响应式扩展将数据整理到 100 毫秒的缓冲区中:

this.subscription = this.dataService
    .Where(x => !string.Equals("FOO", x.Key.Source))
    .Buffer(TimeSpan.FromMilliseconds(100))
    .ObserveOn(this.dispatcherService)
    .Where(x => x.Count != 0)
    .Subscribe(this.OnBufferReceived);

这工作正常。但是,我想要的行为与操作提供的行为略有不同Buffer。本质上,如果收到另一个数据项,我想重置计时器。只有当整个 100 毫秒都没有收到数据时,我才想处理它。这开启了从不处理数据的可能性,因此我也应该能够指定最大计数。我会想象一些类似的东西:

.SlidingBuffer(TimeSpan.FromMilliseconds(100), 10000)

我环顾四周,在 Rx 中找不到类似的东西?任何人都可以确认/否认这一点吗?

4

5 回答 5

19

这可以通过组合 的内置方法WindowThrottle方法来实现Observable。首先,让我们解决忽略最大计数条件的简单问题:

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay)
{
    var closes = stream.Throttle(delay);
    return stream.Window(() => closes).SelectMany(window => window.ToList());
}

强大的Window方法完成了繁重的工作。现在很容易看到如何添加最大计数:

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay, Int32? max=null)
{
    var closes = stream.Throttle(delay);
    if (max != null)
    {
        var overflows = stream.Where((x,index) => index+1>=max);
        closes = closes.Merge(overflows);
    }
    return stream.Window(() => closes).SelectMany(window => window.ToList());
}

我会在我的博客上写一篇文章来解释这一点。https://gist.github.com/2244036

Window 方法的文档:

于 2012-03-20T16:56:58.280 回答
16

我写了一个扩展来做你所追求的大部分 - BufferWithInactivity.

这里是:

public static IObservable<IEnumerable<T>> BufferWithInactivity<T>(
    this IObservable<T> source,
    TimeSpan inactivity,
    int maximumBufferSize)
{
    return Observable.Create<IEnumerable<T>>(o =>
    {
        var gate = new object();
        var buffer = new List<T>();
        var mutable = new SerialDisposable();
        var subscription = (IDisposable)null;
        var scheduler = Scheduler.ThreadPool;

        Action dump = () =>
        {
            var bts = buffer.ToArray();
            buffer = new List<T>();
            if (o != null)
            {
                o.OnNext(bts);
            }
        };

        Action dispose = () =>
        {
            if (subscription != null)
            {
                subscription.Dispose();
            }
            mutable.Dispose();
        };

        Action<Action<IObserver<IEnumerable<T>>>> onErrorOrCompleted =
            onAction =>
            {
                lock (gate)
                {
                    dispose();
                    dump();
                    if (o != null)
                    {
                        onAction(o);
                    }
                }
            };

        Action<Exception> onError = ex =>
            onErrorOrCompleted(x => x.OnError(ex));

        Action onCompleted = () => onErrorOrCompleted(x => x.OnCompleted());

        Action<T> onNext = t =>
        {
            lock (gate)
            {
                buffer.Add(t);
                if (buffer.Count == maximumBufferSize)
                {
                    dump();
                    mutable.Disposable = Disposable.Empty;
                }
                else
                {
                    mutable.Disposable = scheduler.Schedule(inactivity, () =>
                    {
                        lock (gate)
                        {
                            dump();
                        }
                    });
                }
            }
        };

        subscription =
            source
                .ObserveOn(scheduler)
                .Subscribe(onNext, onError, onCompleted);

        return () =>
        {
            lock (gate)
            {
                o = null;
                dispose();
            }
        };
    });
}
于 2011-09-30T00:32:24.150 回答
2

使用 Rx Extensions 2.0,您可以通过接受超时和大小的新缓冲区重载来满足这两个要求:

this.subscription = this.dataService
    .Where(x => !string.Equals("FOO", x.Key.Source))
    .Buffer(TimeSpan.FromMilliseconds(100), 1)
    .ObserveOn(this.dispatcherService)
    .Where(x => x.Count != 0)
    .Subscribe(this.OnBufferReceived);

有关文档,请参阅https://msdn.microsoft.com/en-us/library/hh229200(v=vs.103).aspx

于 2015-03-19T20:06:41.760 回答
0

我想这可以在 Buffer 方法之上实现,如下所示:

public static IObservable<IList<T>> SlidingBuffer<T>(this IObservable<T> obs, TimeSpan span, int max)
        {
            return Observable.CreateWithDisposable<IList<T>>(cl =>
            {
                var acc = new List<T>();
                return obs.Buffer(span)
                        .Subscribe(next =>
                        {
                            if (next.Count == 0) //no activity in time span
                            {
                                cl.OnNext(acc);
                                acc.Clear();
                            }
                            else
                            {
                                acc.AddRange(next);
                                if (acc.Count >= max) //max items collected
                                {
                                    cl.OnNext(acc);
                                    acc.Clear();
                                }
                            }
                        }, err => cl.OnError(err), () => { cl.OnNext(acc); cl.OnCompleted(); });
            });
        }

注意:我没有测试过它,但我希望它能给你这个想法。

于 2011-10-01T08:11:53.707 回答
0

恐慌上校的解决方案几乎是完美的。唯一缺少的是一个Publish组件,以使该解决方案也适用于冷序列。

/// <summary>
/// Projects each element of an observable sequence into a buffer that's sent out
/// when either a given inactivity timespan has elapsed, or it's full,
/// using the specified scheduler to run timers.
/// </summary>
public static IObservable<IList<T>> BufferUntilInactive<T>(
    this IObservable<T> source, TimeSpan dueTime, int maxCount,
    IScheduler scheduler = default)
{
    if (maxCount < 1) throw new ArgumentOutOfRangeException(nameof(maxCount));
    scheduler ??= Scheduler.Default;
    return source.Publish(published =>
    {
        var combinedBoundaries = Observable.Merge
        (
            published.Throttle(dueTime, scheduler),
            published.Skip(maxCount - 1)
        );

        return published
            .Window(() => combinedBoundaries)
            .SelectMany(window => window.ToList());
    });
}

除了添加Publish,我还用.Where((_, index) => index + 1 >= maxCount)等效但更短的.Skip(maxCount - 1). 为了完整起见,还有一个IScheduler参数,用于配置运行计时器的调度程序。

于 2021-11-01T00:13:59.357 回答