找到了解决方法。
首先,让我澄清一下最初的问题。我需要一个可在无限序列上工作的可暂停管道。管道是:
- 同步读取序列:
Enum()
- 并行处理项目:
AsParallel().Select(a => a)
- 进行同步处理:
foreach
body
第 3 步可能会暂停管道。那是仿效的Sleep()
。问题是当管道暂停时,第 2 步提前获取了太多元素。Plinq 必须有一些内部队列。队列大小不能显式配置。大小取决于ParallelMergeOptions
。ParallelMergeOptions.NotBuffered
降低队列大小,但大小对我来说仍然太大。
我的解决方法是知道有多少项目正在处理,达到限制时停止并行处理,当管道再次启动时重新启动并行处理。
int sourceCounter;
IEnumerable<int> SourceEnum() // infinite input sequence
{
while (true)
yield return Interlocked.Increment(ref sourceCounter);
}
[TestMethod]
public void PlainPLinq_PausedConsumtionTest()
{
sourceCounter = 0;
foreach (var i in SourceEnum().AsParallel().WithMergeOptions(ParallelMergeOptions.NotBuffered).Select(a => a))
{
Thread.Sleep(3000);
break;
}
Console.WriteLine("fetched from source sequence: {0}", sourceCounter); // prints 4544 on my machine
}
[TestMethod]
public void MyParallelSelect_NormalConsumtionTest()
{
sourceCounter = 0;
foreach (var i in MyParallelSelect(SourceEnum(), 64, a => a))
{
if (sourceCounter > 1000000)
break;
}
Console.WriteLine("fetched from source sequence: {0}", sourceCounter);
}
[TestMethod]
public void MyParallelSelect_PausedConsumtionTest()
{
sourceCounter = 0;
foreach (var i in MyParallelSelect(SourceEnum(), 64, a => a))
{
Thread.Sleep(3000);
break;
}
Console.WriteLine("fetched from source sequence: {0}", sourceCounter);
}
class DataHolder<D> // reference type to store class or struct D
{
public D Data;
}
static IEnumerable<DataHolder<T>> FetchSourceItems<T>(IEnumerator<T> sourceEnumerator, DataHolder<int> itemsBeingProcessed, int queueSize)
{
for (; ; )
{
var holder = new DataHolder<T>();
if (Interlocked.Increment(ref itemsBeingProcessed.Data) > queueSize)
{
// many enought items are already being processed - stop feeding parallel processing
Interlocked.Decrement(ref itemsBeingProcessed.Data);
yield break;
}
if (sourceEnumerator.MoveNext())
{
holder.Data = sourceEnumerator.Current;
yield return holder;
}
else
{
yield return null; // return null DataHolder to indicate EOF
yield break;
}
}
}
IEnumerable<OutT> MyParallelSelect<T, OutT>(IEnumerable<T> source, int queueSize, Func<T, OutT> selector)
{
var itemsBeingProcessed = new DataHolder<int>();
using (var sourceEnumerator = source.GetEnumerator())
{
for (;;) // restart parallel processing
{
foreach (var outData in FetchSourceItems(sourceEnumerator, itemsBeingProcessed, queueSize).AsParallel().WithMergeOptions(ParallelMergeOptions.NotBuffered).Select(
inData => inData != null ? new DataHolder<OutT> { Data = selector(inData.Data) } : null))
{
Interlocked.Decrement(ref itemsBeingProcessed.Data);
if (outData == null)
yield break; // EOF reached
yield return outData.Data;
}
}
}
}