读取和写入 2 个并行任务,如下所示:
Task[] tasks = new Task[2];
var entityCollection = new BlockingCollection<Dictionary<String, object>>();
tasks[0] = Task.Factory.StartNew(() => ReadData(entityCollection), TaskCreationOptions.LongRunning);
tasks[1] = Task.Factory.StartNew(() => WriteJsontoFile(JSONFileName, entityCollection), TaskCreationOptions.LongRunning);
Task.WaitAll(tasks);
阅读任务:
private void ReadData(BlockingCollection<Dictionary<String, object>> collection)
{
do
{
//continuously data is being read in to entities, this part is working fine and then adding it to collection of BlockingCollection type to be consumed in Write task
entitites.ToList().ForEach(e => collection.Add(e));
} while (true);
collection.CompleteAdding();
}
写任务:
private void WriteJsontoFile(String JsonFileName, BlockingCollection<Dictionary<String, object>> source)
{
using (StreamWriter sw = new StreamWriter(JsonFileName, true))
{
Parallel.ForEach(source.GetConsumingPartitioner(), (line) => ser.Serialize(sw, line));
}
}
GetConsumingPartitioner() 相关代码:
public static class BlockingCollection
{
public static Partitioner<T> GetConsumingPartitioner<T>(
this BlockingCollection<T> collection)
{
return new BlockingCollectionPartitioner<T>(collection);
}
}
class BlockingCollectionPartitioner<T> : Partitioner<T>
{
private BlockingCollection<T> _collection;
internal BlockingCollectionPartitioner(BlockingCollection<T> collection)
{
if (collection == null)
throw new ArgumentNullException("collection");
_collection = collection;
}
public override bool SupportsDynamicPartitions
{
get { return true; }
}
public override IList<IEnumerator<T>> GetPartitions(int partitionCount)
{
if (partitionCount < 1)
throw new ArgumentOutOfRangeException("partitionCount");
var dynamicPartitioner = GetDynamicPartitions();
return Enumerable.Range(0, partitionCount).Select(_ =>
dynamicPartitioner.GetEnumerator()).ToArray();
}
public override IEnumerable<T> GetDynamicPartitions()
{
return _collection.GetConsumingEnumerable();
}
}
我在写任务中得到以下异常:
计数不能小于零。\r\n参数名称: count