1

如何限制 AsParallel() 预先读取并放入其内部缓冲区的项目数量?

这是一个例子:

int returnedCounter;

IEnumerable<int> Enum()
{
    while (true)
        yield return Interlocked.Increment(ref returnedCounter);
}

[TestMethod]
public void TestMethod1()
{
    foreach (var i in Enum().AsParallel().Select(a => a))
    {
        Thread.Sleep(3000);
        break;
    }
    Console.WriteLine(returnedCounter);
}

我消耗 1 项,睡觉,停止枚举。它在我的机器上打印 526400。在我的真实项目中,每个项目分配数千字节。AsParallel() 会预先读取很多项目,这会导致非常糟糕的内存消耗和 CPU 浪费。

放置 WithMergeOptions(ParallelMergeOptions.NotBuffered) 会有所帮助。它打印 4544。但对我来说仍然太多了。

在 Enum() 中等待会冻结主线程中的循环。

4

2 回答 2

4

关于分区器的另一个问题!

在您的情况下,您将必须找到/编写一个一次只需要一个项目的分区器。

这是一篇关于自定义分区的文章


更新:

我只记得我在哪里看到了一个SingleItemPartitioner实现:它在ParallelExtensionsExtras这里的项目中:使用 .NET Framework 进行并行编程的示例

我也刚刚阅读了您的测试代码。我可能应该第一次这样做!

这段代码:

Enum().AsParallel().Select(a => a)

意思是:Enum()尽可能快地并行获取并枚举它,并返回一个新的IEnumerable<int>.

因此,您foreach不是从中提取项目Enum()- 它是从IEnumerable<int>linq 语句创建的新项目中提取项目。

此外,您foreach在主线程上运行,因此每个项目的工作都是单线程的。

如果您想并行运行,但只在需要时产生一个项目,请尝试:

Parallel.ForEach( SingleItemPartitioner.Create( Enum() ), ( i, state ) =>
    {
        Thread.Sleep( 3000 );
        state.Break();
    }
于 2011-12-07T15:59:32.927 回答
0

找到了解决方法。

首先,让我澄清一下最初的问题。我需要一个可在无限序列上工作的可暂停管道。管道是:

  1. 同步读取序列:Enum()
  2. 并行处理项目:AsParallel().Select(a => a)
  3. 进行同步处理:foreachbody

第 3 步可能会暂停管道。那是仿效的Sleep()。问题是当管道暂停时,第 2 步提前获取了太多元素。Plinq 必须有一些内部队列。队列大小不能显式配置。大小取决于ParallelMergeOptionsParallelMergeOptions.NotBuffered降低队列大小,但大小对我来说仍然太大。

我的解决方法是知道有多少项目正在处理,达到限制时停止并行处理,当管道再次启动时重新启动并行处理。

int sourceCounter;

IEnumerable<int> SourceEnum() // infinite input sequence
{
    while (true)
        yield return Interlocked.Increment(ref sourceCounter);
}

[TestMethod]
public void PlainPLinq_PausedConsumtionTest()
{
    sourceCounter = 0;
    foreach (var i in SourceEnum().AsParallel().WithMergeOptions(ParallelMergeOptions.NotBuffered).Select(a => a))
    {
        Thread.Sleep(3000);
        break;
    }
    Console.WriteLine("fetched from source sequence: {0}", sourceCounter); // prints 4544 on my machine
}

[TestMethod]
public void MyParallelSelect_NormalConsumtionTest()
{
    sourceCounter = 0;
    foreach (var i in MyParallelSelect(SourceEnum(), 64, a => a))
    {
        if (sourceCounter > 1000000)
            break;
    }
    Console.WriteLine("fetched from source sequence: {0}", sourceCounter);
}

[TestMethod]
public void MyParallelSelect_PausedConsumtionTest()
{
    sourceCounter = 0;
    foreach (var i in MyParallelSelect(SourceEnum(), 64, a => a))
    {
        Thread.Sleep(3000);
        break;
    }
    Console.WriteLine("fetched from source sequence: {0}", sourceCounter);
}

class DataHolder<D> // reference type to store class or struct D
{
    public D Data;
}

static IEnumerable<DataHolder<T>> FetchSourceItems<T>(IEnumerator<T> sourceEnumerator, DataHolder<int> itemsBeingProcessed, int queueSize)
{
    for (; ; )
    {
        var holder = new DataHolder<T>();
        if (Interlocked.Increment(ref itemsBeingProcessed.Data) > queueSize)
        {
            // many enought items are already being processed - stop feeding parallel processing
            Interlocked.Decrement(ref itemsBeingProcessed.Data);
            yield break;
        }
        if (sourceEnumerator.MoveNext())
        {
            holder.Data = sourceEnumerator.Current;
            yield return holder;
        }
        else
        {
            yield return null; // return null DataHolder to indicate EOF
            yield break;
        }
    }
}

IEnumerable<OutT> MyParallelSelect<T, OutT>(IEnumerable<T> source, int queueSize, Func<T, OutT> selector)
{
    var itemsBeingProcessed = new DataHolder<int>();
    using (var sourceEnumerator = source.GetEnumerator())
    {
        for (;;) // restart parallel processing
        {
            foreach (var outData in FetchSourceItems(sourceEnumerator, itemsBeingProcessed, queueSize).AsParallel().WithMergeOptions(ParallelMergeOptions.NotBuffered).Select(
                inData => inData != null ? new DataHolder<OutT> { Data = selector(inData.Data) } : null))
            {
                Interlocked.Decrement(ref itemsBeingProcessed.Data);
                if (outData == null)
                    yield break; // EOF reached
                yield return outData.Data;
            }
        }
    }
}
于 2011-12-09T11:31:04.910 回答