22

最后编辑

我选择了Timothy的答案,但如果您想要一个利用 C# yield语句的更可爱的实现,请查看Eamon的答案:https ://stackoverflow.com/a/19825659/145757


默认情况下, LINQ查询是延迟流式传输的。

ToArray/ToList提供完整的缓冲,但首先他们很渴望,其次可能需要相当长的时间才能完成无限序列。

有没有办法结合这两种行为:在生成时动态流式传输缓冲值,以便下一次查询不会触发已经查询的元素的生成。

这是一个基本用例:

static IEnumerable<int> Numbers
{
    get
    {
        int i = -1;

        while (true)
        {
            Console.WriteLine("Generating {0}.", i + 1);
            yield return ++i;
        }
    }
}

static void Main(string[] args)
{
    IEnumerable<int> evenNumbers = Numbers.Where(i => i % 2 == 0);

    foreach (int n in evenNumbers)
    {
        Console.WriteLine("Reading {0}.", n);
        if (n == 10) break;
    }

    Console.WriteLine("==========");

    foreach (int n in evenNumbers)
    {
        Console.WriteLine("Reading {0}.", n);
        if (n == 10) break;
    }
}

这是输出:

Generating 0.
Reading 0.
Generating 1.
Generating 2.
Reading 2.
Generating 3.
Generating 4.
Reading 4.
Generating 5.
Generating 6.
Reading 6.
Generating 7.
Generating 8.
Reading 8.
Generating 9.
Generating 10.
Reading 10.
==========
Generating 0.
Reading 0.
Generating 1.
Generating 2.
Reading 2.
Generating 3.
Generating 4.
Reading 4.
Generating 5.
Generating 6.
Reading 6.
Generating 7.
Generating 8.
Reading 8.
Generating 9.
Generating 10.
Reading 10.

生成代码被触发 22 次。

我希望它被触发 11 次,第一次迭代可枚举。

然后第二次迭代将受益于已经生成的值。

它会是这样的:

IEnumerable<int> evenNumbers = Numbers.Where(i => i % 2 == 0).Buffer();

对于熟悉Rx的人来说,这是一种类似于ReplaySubject.

4

8 回答 8

16

IEnumerable<T>.Buffer()扩展方法

public static EnumerableExtensions
{
    public static BufferEnumerable<T> Buffer(this IEnumerable<T> source)
    {
        return new BufferEnumerable<T>(source);
    }
}

public class BufferEnumerable<T> : IEnumerable<T>, IDisposable
{
    IEnumerator<T> source;
    List<T> buffer;
    public BufferEnumerable(IEnumerable<T> source)
    {
        this.source = source.GetEnumerator();
        this.buffer = new List<T>();
    }
    public IEnumerator<T> GetEnumerator()
    {
        return new BufferEnumerator<T>(source, buffer);
    }
    public void Dispose()
    {
        source.Dispose()
    }
}

public class BufferEnumerator<T> : IEnumerator<T>
{
    IEnumerator<T> source;
    List<T> buffer;
    int i = -1;
    public BufferEnumerator(IEnumerator<T> source, List<T> buffer)
    {
        this.source = source;
        this.buffer = buffer;
    }
    public T Current
    {
        get { return buffer[i]; }
    }
    public bool MoveNext()
    {
        i++;
        if (i < buffer.Count)
            return true;
        if (!source.MoveNext())
            return false;
        buffer.Add(source.Current);
        return true;
    }
    public void Reset()
    {
        i = -1;
    }
    public void Dispose()
    {
    }
}

用法

using (var evenNumbers = Numbers.Where(i => i % 2 == 0).Buffer())
{
    ...
}

注释

这里的关键点是,IEnumerable<T> source作为方法输入的给定BufferGetEnumerator调用了一次,而不管Buffer枚举结果的次数。结果的所有枚举器Buffer共享相同的源枚举器和内部列表。

于 2013-11-06T23:16:57.463 回答
8

为此,您可以使用Microsoft.FSharp.Collections.LazyList<>F# 电源组中的类型(是的,来自未安装 F# 的 C# - 没问题!)。它在 Nuget 包FSPowerPack.Core.Community中。

特别是,您想调用LazyListModule.ofSeq(...)which 返回一个LazyList<T>实现IEnumerable<T>并且是惰性和缓存的。

在你的情况下,使用只是一个问题......

var evenNumbers = LazyListModule.ofSeq(Numbers.Where(i => i % 2 == 0));
var cachedEvenNumbers = LazyListModule.ofSeq(evenNumbers);

尽管我个人更喜欢var在所有这些情况下,但请注意,这确实意味着编译时类型将比仅仅更具体IEnumerable<>- 并不是说​​这可能是一个缺点。F# 非接口类型的另一个优点是它们公开了一些使用普通 IEnumerable 无法有效执行的高效操作,例如LazyListModule.skip.

我不确定是否LazyList是线程安全的,但我怀疑它是。


下面的评论中指出的另一种选择(如果您安装了 F#)是SeqModule.Cache(命名空间Microsoft.FSharp.Collections,它将在 GACed 程序集 FSharp.Core.dll 中),它具有相同的有效行为。与其他 .NET 枚举一样,Seq.cache它没有可以有效链接的尾部(或跳过)运算符。

线程安全:与此问题的其他解决方案不同,Seq.cache是线程安全的,因为您可以让多个枚举器并行运行(每个枚举器都不是线程安全的)。

性能我做了一个快速基准测试,LazyList可枚举的开销至少是变体的 4 倍SeqModule.Cache,而变体的开销至少是自定义实现答案的 3 倍。因此,虽然 F# 变体可以工作,但它们并没有那么快。请注意,与执行(例如)I/O 或任何非平凡计算的可枚举相比,慢 3-12 倍仍然不是很慢,所以这在大多数情况下可能无关紧要,但最好保持头脑。

TL;DR如果您需要一个高效的、线程安全的缓存枚举,只需使用SeqModule.Cache.

于 2013-11-06T23:28:49.063 回答
7

我希望这个答案结合了sinelaw 答案的简洁明了以及对Timothy 答案的多个枚举的支持:

public static IEnumerable<T> Cached<T>(this IEnumerable<T> enumerable) {
    return CachedImpl(enumerable.GetEnumerator(), new List<T>());
}

static IEnumerable<T> CachedImpl<T>(IEnumerator<T> source, List<T> buffer) {
    int pos=0;
    while(true) {
        if(pos == buffer.Count) 
            if (source.MoveNext()) 
                buffer.Add(source.Current); 
            else 
                yield break;
        yield return buffer[pos++];
    }
}

关键思想是使用yield return语法来实现一个简短的可枚举实现,但是您仍然需要一个状态机来决定您是否可以从缓冲区中获取下一个元素,或者您是否需要检查底层枚举器。

限制: 这不会尝试线程安全,也不会释放底层枚举器(这通常是非常棘手的,因为只要仍然可以使用任何缓存的枚举器,底层的未缓存枚举器就必须保持未释放)。

于 2013-11-07T00:22:08.990 回答
7

基于上面 Eamon 的回答,这是另一个功能性解决方案(没有新类型),它也适用于同时评估。这表明一般模式(共享状态的迭代)是这个问题的基础。

首先,我们定义了一个非常通用的辅助方法,旨在允许我们模拟C# 中匿名迭代器的缺失特性:

public static IEnumerable<T> Generate<T>(Func<Func<Tuple<T>>> generator)
{
    var tryGetNext = generator();
    while (true)
    {
        var result = tryGetNext();
        if (null == result)
        {
            yield break;
        }
        yield return result.Item1;
    }
}

Generate 就像一个带有状态的聚合器。它接受一个返回初始状态的函数,以及一个匿名的生成器函数yield return,如果它在 C# 中被允许的话。返回的状态initialize是每个枚举,而更全局的状态(在所有枚举之间共享)可以由调用者维护,例如在闭包变量中,如下所示。

现在我们可以将它用于“缓冲的可枚举”问题:

public static IEnumerable<T> Cached<T>(IEnumerable<T> enumerable)
{
    var cache = new List<T>();
    var enumerator = enumerable.GetEnumerator();

    return Generate<T>(() =>
    {
        int pos = -1;
        return () => {
            pos += 1;
            if (pos < cache.Count())
            {
                return new Tuple<T>(cache[pos]);
            }
            if (enumerator.MoveNext())
            {
                cache.Add(enumerator.Current);
                return new Tuple<T>(enumerator.Current);
            }
            return null;
        };
    });
}
于 2013-11-07T05:54:52.537 回答
4

据我所知,没有内置的方法可以做到这一点,这 - 现在你提到它 - 有点令人惊讶(我的猜测是,考虑到人们想要使用这个选项的频率,它可能不值得分析代码以确保生成器每次都给出完全相同的序列所需的努力)。

但是,您可以自己实现它。最简单的方法是在呼叫站点上,如

var evenNumbers = Numbers.Where(i => i % 2 == 0).
var startOfList = evenNumbers.Take(10).ToList();

// use startOfList instead of evenNumbers in the loop.

更一般和更准确地,您可以在生成器中执行此操作:创建一个List<int> cache并且每次生成一个新数字时将其添加到cache您之前的yield return。然后当你再次循环时,首先提供所有缓存的数字。例如

List<int> cachedEvenNumbers = new List<int>();
IEnumerable<int> EvenNumbers
{
  get
  {
    int i = -1;

    foreach(int cached in cachedEvenNumbers)
    {
      i = cached;
      yield return cached;
    }

    // Note: this while loop now starts from the last cached value
    while (true) 
    {
        Console.WriteLine("Generating {0}.", i + 1);
        yield return ++i;
    }
  }
}

我想如果您考虑足够长的时间,您可能会想出IEnumerable<T>.Buffered()扩展方法的一般实现 - 再次,要求是枚举在调用之间不会改变,问题是它是否值得。

于 2013-11-06T23:25:28.363 回答
3

这是一个不完整但紧凑的“功能”实现(未定义新类型)。

错误是它不允许同时枚举。


原始描述:第一个函数应该是第二个函数中的匿名 lambda,但C# 不允许yield在匿名 lambdas 中使用:

// put these in some extensions class

private static IEnumerable<T> EnumerateAndCache<T>(IEnumerator<T> enumerator, List<T> cache)
{
    while (enumerator.MoveNext())
    {
        var current = enumerator.Current;
        cache.Add(current);
        yield return current;
    }
}
public static IEnumerable<T> ToCachedEnumerable<T>(this IEnumerable<T> enumerable)
{
    var enumerator = enumerable.GetEnumerator();
    var cache = new List<T>();
    return cache.Concat(EnumerateAndCache(enumerator, cache));
}

用法:

var enumerable = Numbers.ToCachedEnumerable();
于 2013-11-06T23:56:27.593 回答
3

完全归功于Eamon Nerbonnesinelaw的回答,只是一些调整!首先,在完成时释放枚举器。其次,用锁保护底层枚举器,以便可以在多个线程上安全地使用枚举器。

// This is just the same as @sinelaw's Generator but I didn't like the name
public static IEnumerable<T> AnonymousIterator<T>(Func<Func<Tuple<T>>> generator)
{
    var tryGetNext = generator();
    while (true)
    {
        var result = tryGetNext();
        if (null == result)
        {
            yield break;
        }
        yield return result.Item1;
    }
}

// Cached/Buffered/Replay behaviour
public static IEnumerable<T> Buffer<T>(this IEnumerable<T> self)
{
    // Rows are stored here when they've been fetched once
    var cache = new List<T>();

    // This counter is thread-safe in that it is incremented after the item has been added to the list,
    // hence it will never give a false positive. It may give a false negative, but that falls through
    // to the code which takes the lock so it's ok.
    var count = 0;

    // The enumerator is retained until it completes, then it is discarded.
    var enumerator = self.GetEnumerator();

    // This lock protects the enumerator only. The enumerable could be used on multiple threads
    // and the enumerator would then be shared among them, but enumerators are inherently not
    // thread-safe so a) we must protect that with a lock and b) we don't need to try and be
    // thread-safe in our own enumerator
    var lockObject = new object();

    return AnonymousIterator<T>(() =>
    {
        int pos = -1;
        return () =>
        {
            pos += 1;
            if (pos < count)
            {
                return new Tuple<T>(cache[pos]);
            }
            // Only take the lock when we need to
            lock (lockObject)
            {
                // The counter could have been updated between the check above and this one,
                // so now we have the lock we must check again
                if (pos < count)
                {
                    return new Tuple<T>(cache[pos]);
                }

                // Enumerator is set to null when it has completed
                if (enumerator != null)
                {
                    if (enumerator.MoveNext())
                    {
                        cache.Add(enumerator.Current);
                        count += 1;
                        return new Tuple<T>(enumerator.Current);
                    }
                    else
                    {
                        enumerator = null;
                    }
                }
            }
        }
        return null;
    };
});

}

于 2015-01-02T14:47:38.860 回答
0

我使用以下扩展方法。

这样,输入以最大速度读取,消费者以最大速度处理。

public static IEnumerable<T> Buffer<T>(this IEnumerable<T> input)
{
    var blockingCollection = new BlockingCollection<T>();

    //read from the input
    Task.Factory.StartNew(() =>
    {
        foreach (var item in input)
        {
            blockingCollection.Add(item);
        }

        blockingCollection.CompleteAdding();
    });

    foreach (var item in blockingCollection.GetConsumingEnumerable())
    {
        yield return item;
    }
}

示例用法

这个例子有一个快速的生产者(查找文件)和一个慢的消费者(上传文件)。

long uploaded = 0;
long total = 0;

Directory
    .EnumerateFiles(inputFolder, "*.jpg", SearchOption.AllDirectories)
    .Select(filename =>
    {
        total++;
        return filename;
    })
    .Buffer()
    .ForEach(filename =>
    {
        //pretend to do something slow, like upload the file.
        Thread.Sleep(1000);
        uploaded++;

        Console.WriteLine($"Uploaded {uploaded:N0}/{total:N0}");
    });

在此处输入图像描述

于 2021-12-17T08:17:09.917 回答