1

读取和写入 2 个并行任务,如下所示:

Task[] tasks = new Task[2];
var entityCollection = new BlockingCollection<Dictionary<String, object>>();

tasks[0] = Task.Factory.StartNew(() => ReadData(entityCollection), TaskCreationOptions.LongRunning);
tasks[1] = Task.Factory.StartNew(() => WriteJsontoFile(JSONFileName, entityCollection), TaskCreationOptions.LongRunning);

Task.WaitAll(tasks);

阅读任务:

private void ReadData(BlockingCollection<Dictionary<String, object>> collection)
{
    do
    {
        //continuously data is being read in to entities, this part is working fine and then adding it to collection of BlockingCollection type to be consumed in Write task
        entitites.ToList().ForEach(e => collection.Add(e));
    } while (true);
    collection.CompleteAdding();
}

写任务:

private void WriteJsontoFile(String JsonFileName, BlockingCollection<Dictionary<String, object>> source)
{
    using (StreamWriter sw = new StreamWriter(JsonFileName, true))
    {
        Parallel.ForEach(source.GetConsumingPartitioner(), (line) => ser.Serialize(sw, line));
    }
}

GetConsumingPartitioner() 相关代码:

public static class BlockingCollection
{
    public static Partitioner<T> GetConsumingPartitioner<T>(
    this BlockingCollection<T> collection)
    {
        return new BlockingCollectionPartitioner<T>(collection);
    }
}

class BlockingCollectionPartitioner<T> : Partitioner<T>
{
    private BlockingCollection<T> _collection;

    internal BlockingCollectionPartitioner(BlockingCollection<T> collection)
    {
        if (collection == null)
            throw new ArgumentNullException("collection");
        _collection = collection;
    }

    public override bool SupportsDynamicPartitions
    {
        get { return true; }
    }

    public override IList<IEnumerator<T>> GetPartitions(int partitionCount)
    {
        if (partitionCount < 1)
            throw new ArgumentOutOfRangeException("partitionCount");
        var dynamicPartitioner = GetDynamicPartitions();
        return Enumerable.Range(0, partitionCount).Select(_ =>
            dynamicPartitioner.GetEnumerator()).ToArray();
    }

    public override IEnumerable<T> GetDynamicPartitions()
    {
        return _collection.GetConsumingEnumerable();
    }
}

我在写任务中得到以下异常:

计数不能小于零。\r\n参数名称: count

4

1 回答 1

1

这不是消费的标准语法

BlockingCollection 类

// Consume consume the BlockingCollection 
while (true) Console.WriteLine(bc.Take());
于 2013-10-27T13:08:55.377 回答