9

我正在使用 C# Parallel.ForEach 处理一千多个数据子集。一组需要 5-30 分钟来处理,具体取决于组的大小。在我的电脑上有选项

ParallelOptions po = new ParallelOptions();
po.MaxDegreeOfParallelism = Environment.ProcessorCount

我将获得 8 个并行进程。据我了解,进程在并行任务之间平均分配(例如,第一个任务获得作业编号 1、9、17 等,第二个任务获得 2、10、18 等);因此,一项任务可以比其他任务更快地完成自己的工作。因为这些数据集比其他数据集花费的时间更少。

问题是四个并行任务在 24 小时内完成,但最后一个在 48 小时内完成。是否有机会组织并行性以使所有并行任务均等地完成?这意味着所有并行任务继续工作,直到所有工作都完成?

4

3 回答 3

4

由于作业不相等,因此您无法在处理器之间分配作业数量并让它们几乎同时完成。我认为你需要的是 8 个工作线程来检索下一个工作。您将不得不对该功能使用锁定才能获得下一份工作。

如果我错了,请有人纠正我,但在我的脑海中......一个工作线程可以被赋予这样的功能:

public void ProcessJob()
{
    for (Job myJob = GetNextJob(); myJob != null; myJob = GetNextJob())
    {
        // process job
    }
}

获得下一份工作的函数如下所示:

private List<Job> jobs;
private int currentJob = 0;

private Job GetNextJob()
{
    lock (jobs)
    {
        Job job = null;
        if (currentJob < jobs.Count)
        {
            job = jobs[currentJob];
            currentJob++;
        }
        return job;
    }
}
于 2013-03-06T16:08:59.127 回答
1

正如其他人所建议的,一种选择是管理您自己的生产者消费者队列。我想指出,使用BlockingCollection使这容易做到。

BlockingCollection<JobData> queue = new BlockingCollection<JobData>();

//add data to queue; if it can be done quickly, just do it inline.  
//If it's expensive, start a new task/thread just to add items to the queue.
foreach (JobData job in data)
    queue.Add(job);

queue.CompleteAdding();

for (int i = 0; i < Environment.ProcessorCount; i++)
{
    Task.Factory.StartNew(() =>
    {
        foreach (var job in queue.GetConsumingEnumerable())
        {
            ProcessJob(job);
        }
    }, TaskCreationOptions.LongRunning);
}
于 2013-03-07T17:34:08.137 回答
1

似乎没有现成的解决方案,必须创建它。

我之前的代码是:

var ListOfSets = (from x in Database
           group x by x.SetID into z
           select new { ID = z.Key}).ToList();

ParallelOptions po = new ParallelOptions();
po.MaxDegreeOfParallelism = Environment.ProcessorCount;

Parallel.ForEach(ListOfSets, po, SingleSet=>
{
     AnalyzeSet(SingleSet.ID);
});

为了在所有 CPU 之间平均分担工作,我仍然习惯于Parallel做这项工作,但ForEach我没有使用For和来自 Matt 的想法。新代码是:

Parallel.For(0, Environment.ProcessorCount, i=>
{
    while(ListOfSets.Count() > 0)
    {
        double SetID = 0;
        lock (ListOfSets)
        {
            SetID = ListOfSets[0].ID;
            ListOfSets.RemoveAt(0);
        }
     AnalyzeSet(SetID);
    }
});

所以,谢谢你的建议。

于 2013-03-07T17:24:29.797 回答