2

我有一个集合,其中包含要处理的元素,最多只能同时处理四个元素。在运行时,所有进程一起启动并且都进入等待状态。一次只处理四个元素。

问题是处理元素是随机选择的,因为所有线程都在等待资源释放。意味着第一个元素可以是集合中的最后一个元素。

但是,我需要处理元素以便它们在集合中。

请告诉我如何实现这一目标?

我正在使用 TPL 和 C# 4.0

4

3 回答 3

5

对于并行性,总是存在定义“按顺序”的含义的问题。假设您有 100 个项目的集合。“一次按 4 个”处理它们(如您所要求的)可能意味着:

  1. 松散排序:使用 4 个线程,按照原始集合的顺序发出任务。

    在这种情况下,您可以使用:

    ParallelOptions po = new ParallelOptions() { MaxDegreeOfParallelism = 4 };
    Parallel.ForEach(list.AsParallel().AsOrdered(), po,
             (item) =>
             {
                 // code
             });
    

    在不平衡任务的情况下,这将很快失去原始顺序,因为一些线程可能会在繁重的任务上落后,但任务将按顺序分配。

  2. 严格排序:按 4 组的顺序处理它们,如下所示:

                   0 1 2 3                
                   4 tasks
         _____________________________
                    barrier
    
                   4 5 6 7                
                   4 tasks
         _____________________________
                    barrier
    
                     etc.
    

    在这种情况下,您可以使用屏障:

    Barrier b = new Barrier(4);
    ParallelOptions po = new ParallelOptions() { MaxDegreeOfParallelism = 4 };
    Parallel.ForEach(list.AsParallel().AsOrdered(), po,
        (item) =>
        {
            // code
            b.SignalAndWait();
        });
    

    尽管您必须确保任务数是 4 的倍数,否则在最后一次迭代中不会发出屏障信号。

  3. 在单个任务中处理 4 项:您可以创建一个任务对象来封装原始列表的 4 项,然后Parallel.ForEach像第一种情况那样做一个简单的操作(即每个线程将顺序处理 4 项作为单个任务的一部分)。这将按顺序以 4 个为一组发出任务,但如果任务花费的时间过长,可能会再次导致一些线程滞后。

于 2012-09-03T06:25:01.913 回答
0

我不清楚在“随机选择元素”的情况下你到底在做什么。但是,如果您使用Paralle.ForEach(),那么它会尝试提高效率,因此它会以某种方式对输入序列进行分区。如果输入序列是IList<T>,它将使用范围分区,否则,它将使用块分区(请参阅PLINQ 中的块分区与范围分区)。

如果要按顺序处理项目,可以Parallel.ForEach()使用自定义分区器进行配置,它将集合划分为大小为 1 的块。

但是由于您在这里并不真正需要Parallel.ForEach(),因此可能更简单的解决方案是创建 4 个任务来逐个处理项目。对于同步,您可以使用BlockingCollection. 就像是:

public static class ParallelOrdered
{
    public static void ForEach<T>(IEnumerable<T> collection, Action<T> action, int degreeOfParallelism)
    {
        var blockingCollection = new BlockingCollection<T>();
        foreach (var item in collection)
            blockingCollection.Add(item);
        blockingCollection.CompleteAdding();

        var tasks = new Task[degreeOfParallelism];
        for (int i = 0; i < degreeOfParallelism; i++)
        {
            tasks[i] = Task.Factory.StartNew(
                () =>
                {
                    foreach (var item in blockingCollection.GetConsumingEnumerable())
                        action(item);
                });
        }
        Task.WaitAll(tasks);
    }
}
于 2012-09-03T09:59:58.747 回答
0

这就是我完成这项任务的方式

public delegate void ProcessFinished(IParallelProcess process);
public interface IParallelProcess
{
    void Start();
    event ProcessFinished ProcessFinished;
}

public class ParallelProcessBasket : ConcurrentQueue<IParallelProcess>
{
    public void Put(IParallelProcess process)
    {
        base.Enqueue(process);
    }
    public IParallelProcess Get()
    {
        IParallelProcess process = null;
        base.TryDequeue(out process);
        return process;
    }
}
public class ParallelProcessor<T> where T : class
{
    private ParallelProcessBasket basket;
    private readonly int MAX_DEGREE_OF_PARALLELISM;
    private Action<T> action;
    public ParallelProcessor(int degreeOfParallelism, IEnumerable<IParallelProcess> processes, Action<T> action)
    {
        basket = new ParallelProcessBasket();
        this.action = action;
        processes.ToList().ForEach(
            (p) =>
            {
                basket.Enqueue(p);
                p.ProcessFinished += new ProcessFinished(p_ProcessFinished);
            });
        MAX_DEGREE_OF_PARALLELISM = degreeOfParallelism;
    }

    private void p_ProcessFinished(IParallelProcess process)
    {
        if (!basket.IsEmpty)
        {
            T element = basket.Get() as T;
            if (element != null)
            {
                Task.Factory.StartNew(() => action(element));
            }
        }
    }


    public void StartProcessing()
    {
        // take first level of iteration
        for (int cnt = 0; cnt < MAX_DEGREE_OF_PARALLELISM; cnt++)
        {
            if (!basket.IsEmpty)
            {
                T element = basket.Get() as T;
                if (element != null)
                {
                    Task.Factory.StartNew(() => action(element));
                }
            }
        }
    }
}
static void Main(string[] args)    
{
     ParallelProcessor<ParallelTask> pr = new ParallelProcessor<ParallelTask>(Environment.ProcessorCount, collection, (e) => e.Method1());
            pr.StartProcessing();
}

谢谢..

于 2012-10-02T06:39:48.763 回答