1

我有一种情况,我正在运行一些任务,每个任务需要几秒钟到几分钟。我也有可能添加更多需要添加到已经运行的并行循环的数据。是否可以更新 Parallel.For 正在使用的当前集合并让它继续迭代它,直到没有更多对象要检索?这是一些显示我的问题的示例代码:

[Test]
public void DoesParallelForGetNewEntriesInLoop()
{
    ConcurrentDictionary<int, string> dict = new ConcurrentDictionary<int, string>();
    ConcurrentBag<string> bag = new ConcurrentBag<string>();
    int i = 0;
    // write to dictionary every 10ms simulating new additions
    Timer t = new Timer(callback =>
    {
        dict.TryAdd(i++, "Value" + i);
    }, dict, 0, 10);
    // Add initial values
    dict.TryAdd(i++, "Value" + i);
    dict.TryAdd(i++, "Value" + i);
    dict.TryAdd(i++, "Value" + i);

    Parallel.For(0, dict.Count, (a, state) =>
    {
        string val = string.Empty;
        if (dict.TryGetValue(a, out val))
        {
            bag.Add(val + Environment.NewLine);
        }
        if (i++ == 50)
            state.Stop();
        Thread.Sleep(5000);

    });
    foreach (var item in bag)
    {
        File.AppendAllText("parallelWrite.txt", item);
    }            
}

当我运行它时,我得到的结果很简单:

Value2
Value1
Value3
Value4

有没有更好的方法来做我想做的事情?

4

2 回答 2

2

如何使用BlockingCollection和调用GetConsumingEnumerable()你的Parallel.ForEach

BlockingCollection<string> collection = new BlockingCollection<string>();

Parallel.ForEach(collection.GetConsumingEnumerable(), (x) => Console.WriteLine(x));

Add()您可以使用 BlockingCollection 的方法将内容添加到集合中。

从技术上讲,存在“双重锁定”,因为 Parallel.ForEach 在从可枚举项中获取大块项目进行处理时会锁定集合,并且 BlockingCollection 是为支持多个消费者而构建的,因此它也实现了锁定。如果这成为性能问题(很可能),那么您可以为 BlockingCollection 实现自己的分区器,因为 Parallel.ForEach 具有接受 OrderablePartitioner 和 Partitioner 的重载。有一篇很好的文章描述了这里的方法:http: //blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx

于 2015-02-05T20:19:32.247 回答
0

get 中的 from 和 to 参数Parallel.For仅在循环开始前计算一次。使用Parallel.ForEach迭代新项目。我不确定您要实现什么,但更好的方法可能是将新数据放入堆栈/队列中并定期弹出数据并处理它。

于 2015-02-05T19:50:32.380 回答