4

假设我有一些IEnumerator<T>在方法内部进行了大量处理MoveNext()

从该枚举器消耗的代码不仅消耗与可用数据一样快,而且偶尔会等待(其细节与我的问题无关)以同步需要恢复消耗的时间。但是当它下一次调用 时MoveNext(),它需要尽可能快的数据。

一种方法是将整个流预先消耗到某个列表或数组结构中以进行即时枚举。然而,这将浪费内存,因为在任何一个时间点,只有一个项目正在使用,并且在整个数据不适合内存的情况下,这将是令人望而却步的。

那么.net中是否有一些通用的东西以某种方式包装枚举器/可枚举,它预先异步预迭代底层枚举器几个项目并缓冲结果,以便它始终在其缓冲区中有许多可用的项目和调用 MoveNext 将永远不必等待?显然,消耗的项目,即由调用者的后续 MoveNext 迭代的项目,将从缓冲区中删除。

注意我正在尝试做的部分工作也称为Backpressure,并且在 Rx 世界中,已经在RxJava中实现,并且正在Rx.NET中进行讨论。Rx(推送数据的 observables)可以被认为是枚举器的相反方法(枚举器允许拉取数据)。背压在拉取方法中相对容易,正如我的回答所示:只需暂停消费。推动时更难,需要额外的反馈机制。

4

2 回答 2

4

您的自定义可枚举类的更简洁的替代方法是执行以下操作:

public static IEnumerable<T> Buffer<T>(this IEnumerable<T> source, int bufferSize)
{
    var queue = new BlockingCollection<T>(bufferSize);

    Task.Run(() => {
        foreach(var i in source) queue.Add(i);
        queue.CompleteAdding();
    });

    return queue.GetConsumingEnumerable();
}

这可以用作:

var slowEnumerable = GetMySlowEnumerable();
var buffered = slowEnumerable.Buffer(10); // Populates up to 10 items on a background thread
于 2015-06-09T01:30:01.627 回答
1

有不同的方法可以自己实现,我决定使用

  • 每个枚举器有一个专用线程,执行异步预缓冲
  • 要预缓冲的固定数量的元素

这对于我手头的情况来说是完美的(只有几个,非常长时间运行的枚举器),但是如果你使用大量的枚举器,创建一个线程可能会太重,如果你使用固定数量的元素可能太不灵活需要更动态的东西,可能基于项目的实际内容。

到目前为止,我只测试了它的主要功能,可能还存在一些粗糙的边缘。它可以这样使用:

int bufferSize = 5;
IEnumerable<int> en = ...;
foreach (var item in new PreBufferingEnumerable<int>(en, bufferSize))
{
    ...

这是枚举器的要点:

class PreBufferingEnumerator<TItem> : IEnumerator<TItem>
{
    private readonly IEnumerator<TItem> _underlying;
    private readonly int _bufferSize;
    private readonly Queue<TItem> _buffer;
    private bool _done;
    private bool _disposed;

    public PreBufferingEnumerator(IEnumerator<TItem> underlying, int bufferSize)
    {
        _underlying = underlying;
        _bufferSize = bufferSize;
        _buffer = new Queue<TItem>();
        Thread preBufferingThread = new Thread(PreBufferer) { Name = "PreBufferingEnumerator.PreBufferer", IsBackground = true };
        preBufferingThread.Start();
    }

    private void PreBufferer()
    {
        while (true)
        {
            lock (_buffer)
            {
                while (_buffer.Count == _bufferSize && !_disposed)
                    Monitor.Wait(_buffer);
                if (_disposed)
                    return;
            }
            if (!_underlying.MoveNext())
            {
                lock (_buffer)
                    _done = true;
                return;
            }
            var current = _underlying.Current; // do outside lock, in case underlying enumerator does something inside get_Current()
            lock (_buffer)
            {
                _buffer.Enqueue(current);
                Monitor.Pulse(_buffer);
            }
        }
    }

    public bool MoveNext()
    {
        lock (_buffer)
        {
            while (_buffer.Count == 0 && !_done && !_disposed)
                Monitor.Wait(_buffer);
            if (_buffer.Count > 0)
            {
                Current = _buffer.Dequeue();
                Monitor.Pulse(_buffer); // so PreBufferer thread can fetch more
                return true;
            }
            return false; // _done || _disposed
        }
    }

    public TItem Current { get; private set; }

    public void Dispose()
    {
        lock (_buffer)
        {
            if (_disposed)
                return;
            _disposed = true;
            _buffer.Clear();
            Current = default(TItem);
            Monitor.PulseAll(_buffer);
        }
    }
于 2015-06-08T04:11:30.933 回答