23

提供items的是 LINQ 表达式的结果:

var items = from item in ItemsSource.RetrieveItems()
            where ...

假设每个项目的生成需要一些不可忽略的时间。

有两种可能的操作模式:

  1. 使用foreach将允许在集合开始时开始处理项目,比最终可用的项目要早得多。但是,如果我们想稍后再次处理同一个集合,我们将不得不复制保存它:

    var storedItems = new List<Item>();
    foreach(var item in items)
    {
        Process(item);
        storedItems.Add(item);
    }
    
    // Later
    foreach(var item in storedItems)
    {
        ProcessMore(item);
    }
    

    因为如果我们刚刚做了foreach(... in items)thenItemsSource.RetrieveItems()会再次被调用。

  2. 我们可以.ToList()直接使用,但这将迫使我们等待最后一个项目被检索,然后我们才能开始处理第一个项目。

问题:是否有一种IEnumerable实现会像常规 LINQ 查询结果一样第一次迭代,但会在进程中实现,以便第二次foreach迭代存储的值?

4

4 回答 4

12

一个有趣的挑战,所以我必须提供自己的解决方案。事实上非常有趣,我的解决方案现在是版本 3。版本 2 是我根据 Servy 的反馈进行的简化。然后我意识到我的解决方案有很大的缺点。如果缓存的可枚举的第一个枚举没有完成,则不会进行缓存。许多 LINQ 扩展都喜欢First并且Take只会枚举足够的可枚举来完成工作,我必须更新到版本 3 才能使这项工作与缓存一起使用。

问题是关于不涉及并发访问的可枚举的后续枚举。尽管如此,我还是决定让我的解决方案线程安全。它增加了一些复杂性和一些开销,但应该允许在所有场景中使用该解决方案。

public static class EnumerableExtensions {

  public static IEnumerable<T> Cached<T>(this IEnumerable<T> source) {
    if (source == null)
      throw new ArgumentNullException("source");
    return new CachedEnumerable<T>(source);
  }

}

class CachedEnumerable<T> : IEnumerable<T> {

  readonly Object gate = new Object();

  readonly IEnumerable<T> source;

  readonly List<T> cache = new List<T>();

  IEnumerator<T> enumerator;

  bool isCacheComplete;

  public CachedEnumerable(IEnumerable<T> source) {
    this.source = source;
  }

  public IEnumerator<T> GetEnumerator() {
    lock (this.gate) {
      if (this.isCacheComplete)
        return this.cache.GetEnumerator();
      if (this.enumerator == null)
        this.enumerator = source.GetEnumerator();
    }
    return GetCacheBuildingEnumerator();
  }

  public IEnumerator<T> GetCacheBuildingEnumerator() {
    var index = 0;
    T item;
    while (TryGetItem(index, out item)) {
      yield return item;
      index += 1;
    }
  }

  bool TryGetItem(Int32 index, out T item) {
    lock (this.gate) {
      if (!IsItemInCache(index)) {
        // The iteration may have completed while waiting for the lock.
        if (this.isCacheComplete) {
          item = default(T);
          return false;
        }
        if (!this.enumerator.MoveNext()) {
          item = default(T);
          this.isCacheComplete = true;
          this.enumerator.Dispose();
          return false;
        }
        this.cache.Add(this.enumerator.Current);
      }
      item = this.cache[index];
      return true;
    }
  }

  bool IsItemInCache(Int32 index) {
    return index < this.cache.Count;
  }

  IEnumerator IEnumerable.GetEnumerator() {
    return GetEnumerator();
  }

}

扩展名是这样使用的(sequence是一个IEnumerable<T>):

var cachedSequence = sequence.Cached();

// Pulling 2 items from the sequence.
foreach (var item in cachedSequence.Take(2))
  // ...

// Pulling 2 items from the cache and the rest from the source.
foreach (var item in cachedSequence)
  // ...

// Pulling all items from the cache.
foreach (var item in cachedSequence)
  // ...

如果仅枚举部分可枚举对象(例如cachedSequence.Take(2).ToList(). 使用的枚举器ToList将被释放,但底层源枚举器未释放。这是因为前 2 项被缓存并且源枚举器保持活动状态应该对后续项目的请求。在这种情况下,源枚举器仅在符合垃圾收集条件时才被清理(这将与可能的大缓存相同)。

于 2012-09-14T16:11:25.587 回答
8

看看Reactive Extentsions库 - 有一个MemoizeAll()扩展,它会在您的 IEnumerable 中缓存项目,一旦它们被访问,并存储它们以供将来访问。

请参阅Bart De Smet 的这篇MemoizeAll博文,了解有关其他 Rx 方法的详细信息。

编辑:这实际上可以在单独的交互式扩展包中找到 - 可从NuGetMicrosoft 下载

于 2012-09-14T15:20:12.723 回答
3
public static IEnumerable<T> SingleEnumeration<T>(this IEnumerable<T> source)
{
    return new SingleEnumerator<T>(source);
}

private class SingleEnumerator<T> : IEnumerable<T>
{
    private CacheEntry<T> cacheEntry;
    public SingleEnumerator(IEnumerable<T> sequence)
    {
        cacheEntry = new CacheEntry<T>(sequence.GetEnumerator());
    }

    public IEnumerator<T> GetEnumerator()
    {
        if (cacheEntry.FullyPopulated)
        {
            return cacheEntry.CachedValues.GetEnumerator();
        }
        else
        {
            return iterateSequence<T>(cacheEntry).GetEnumerator();
        }
    }

    IEnumerator IEnumerable.GetEnumerator()
    {
        return this.GetEnumerator();
    }
}

private static IEnumerable<T> iterateSequence<T>(CacheEntry<T> entry)
{
    using (var iterator = entry.CachedValues.GetEnumerator())
    {
        int i = 0;
        while (entry.ensureItemAt(i) && iterator.MoveNext())
        {
            yield return iterator.Current;
            i++;
        }
    }
}

private class CacheEntry<T>
{
    public bool FullyPopulated { get; private set; }
    public ConcurrentQueue<T> CachedValues { get; private set; }

    private static object key = new object();
    private IEnumerator<T> sequence;

    public CacheEntry(IEnumerator<T> sequence)
    {
        this.sequence = sequence;
        CachedValues = new ConcurrentQueue<T>();
    }

    /// <summary>
    /// Ensure that the cache has an item a the provided index.  If not, take an item from the 
    /// input sequence and move to the cache.
    /// 
    /// The method is thread safe.
    /// </summary>
    /// <returns>True if the cache already had enough items or 
    /// an item was moved to the cache, 
    /// false if there were no more items in the sequence.</returns>
    public bool ensureItemAt(int index)
    {
        //if the cache already has the items we don't need to lock to know we 
        //can get it
        if (index < CachedValues.Count)
            return true;
        //if we're done there's no race conditions hwere either
        if (FullyPopulated)
            return false;

        lock (key)
        {
            //re-check the early-exit conditions in case they changed while we were
            //waiting on the lock.

            //we already have the cached item
            if (index < CachedValues.Count)
                return true;
            //we don't have the cached item and there are no uncached items
            if (FullyPopulated)
                return false;

            //we actually need to get the next item from the sequence.
            if (sequence.MoveNext())
            {
                CachedValues.Enqueue(sequence.Current);
                return true;
            }
            else
            {
                FullyPopulated = true;
                return false;
            }
        }
    }
}

所以这已经被编辑(基本上)以支持多线程访问。多个线程可以请求项目,并且在逐个项目的基础上,它们将被缓存。它不需要等待整个序列被迭代以返回缓存值。下面是一个示例程序,演示了这一点:

private static IEnumerable<int> interestingIntGenertionMethod(int maxValue)
{
    for (int i = 0; i < maxValue; i++)
    {
        Thread.Sleep(1000);
        Console.WriteLine("actually generating value: {0}", i);
        yield return i;
    }
}

public static void Main(string[] args)
{
    IEnumerable<int> sequence = interestingIntGenertionMethod(10)
        .SingleEnumeration();

    int numThreads = 3;
    for (int i = 0; i < numThreads; i++)
    {
        int taskID = i;
        Task.Factory.StartNew(() =>
        {
            foreach (int value in sequence)
            {
                Console.WriteLine("Task: {0} Value:{1}",
                    taskID, value);
            }
        });
    }

    Console.WriteLine("Press any key to exit...");
    Console.ReadKey(true);
}

你真的需要看到它运行才能了解这里的力量。一旦单个线程强制生成下一个实际值,所有剩余的线程都可以立即打印该生成的值,但如果该线程没有未缓存的值要打印,它们都将等待。(显然,线程/线程池调度可能会导致一项任务花费比需要更长的时间来打印它的值。)

于 2012-09-14T15:10:10.730 回答
0

Martin Liversage 和 Servy已经分别发布了Cached/运算符的线程安全实现,并且System.Interactive包中的线程安全运算符也可用。如果不需要线程安全,并且不希望支付线程同步的成本,那么在这个问题中有提供非同步实现的答案。所有这些实现的共同点是它们都基于自定义类型。我的挑战是在一个独立的扩展方法(不附加任何条件)中编写一个类似的非同步运算符。这是我的实现:SingleEnumerationMemoiseToCachedEnumerable

public static IEnumerable<T> MemoiseNotSynchronized<T>(this IEnumerable<T> source)
{
    // Argument validation omitted
    IEnumerator<T> enumerator = null;
    List<T> buffer = null;
    return Implementation();

    IEnumerable<T> Implementation()
    {
        if (buffer != null && enumerator == null)
        {
            // The source has been fully enumerated
            foreach (var item in buffer) yield return item;
            yield break;
        }

        enumerator ??= source.GetEnumerator();
        buffer ??= new();
        for (int i = 0; ; i = checked(i + 1))
        {
            if (i < buffer.Count)
            {
                yield return buffer[i];
            }
            else if (enumerator.MoveNext())
            {
                Debug.Assert(buffer.Count == i);
                var current = enumerator.Current;
                buffer.Add(current);
                yield return current;
            }
            else
            {
                enumerator.Dispose(); enumerator = null;
                yield break;
            }
        }
    }
}

使用示例:

IEnumerable<Point> points = GetPointsFromDB().MemoiseNotSynchronized();
// Enumerate the 'points' any number of times, on a single thread.
// The data will be fetched from the DB only once.
// The connection with the DB will open when the 'points' is enumerated
// for the first time, partially or fully.
// The connection will stay open until the 'points' is enumerated fully
// for the first time.

在 Fiddle 上测试MemoiseNotSynchronized运算符。

于 2021-08-16T00:49:39.940 回答