2

我想知道如何设置一个为线程动态分配工作的类,最好是在 C# 中。我已经查看了[this]以获得光线追踪器的解释,但我什至没有看到关于它的外观的描述。我不想要过于复杂的东西。

我有一组任务太大而无法平均分配并分配给我的线程。我想动态地将小部分任务分配给每个线程,然后当线程完成时,获取它们的结果并给它们更多任务。我知道我几年前做过类似的事情,但我找不到那些笔记,也没有在谷歌或这里取得成功。

任何帮助将不胜感激。

下面是我对伪代码的看法。它不会看起来很漂亮,只是为了让您了解我在说什么。

TaskList = ArrayList of n tasks
ResultList = ArrayList of thread results
create x threads
integer TasksCompleted
while(TasksCompleted < TaskList.size){
  thread = next available thread in pool
  ResultList.add(thread.results)
  tasksCompleted += thread.numOfTasks
  thread.doWork(Next set of Tasks)
}
clean up threads
processResults(ResultList)
4

2 回答 2

1

如果您使用的是 .NET 4.0+,那么您可以使用 Parallel.For 方法。它会启动尽可能多的线程来并行处理循环。好处是它管理线程并为您监视它们。这是并行的操作方法介绍:http: //msdn.microsoft.com/en-us/library/dd460713.aspx

您还应该研究 .NET 4.0 中引入的任务并行库的其他构造。它有很多非常简洁的多线程助手,使多线程比启动和管理自己的线程更容易。您可以在此处查看 MSDN 文档:http: //msdn.microsoft.com/en-us/library/dd537609.aspx

于 2013-05-02T19:22:11.573 回答
0

这是一个BlockingCollection用于管理简单工作队列的示例。

当工作线程完成当前项目时,它将从工作队列中删除一个新项目,处理该项目,然后将其添加到输出队列。

一个单独的消费者线程从输出队列中删除已完成的项目并对其进行处理。

最后,我们必须等待所有工作人员完成(Task.WaitAll(workers)),然后才能将输出队列标记为已完成(outputQueue.CompleteAdding())。

这个例子只有工作项的整数;在实际代码中,您将使用封装工作的对象。

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    class Program
    {
        static void Main(string[] args)
        {
            new Program().run();
        }

        void run()
        {
            int threadCount = 4;
            Task[] workers = new Task[threadCount];

            Task.Factory.StartNew(consumer);

            for (int i = 0; i < threadCount; ++i)
            {
                int workerId = i;
                Task task = new Task(() => worker(workerId));
                workers[i] = task;
                task.Start();
            }

            for (int i = 0; i < 100; ++i)
            {
                Console.WriteLine("Queueing work item {0}", i);
                inputQueue.Add(i);
                Thread.Sleep(50);
            }

            Console.WriteLine("Stopping adding.");
            inputQueue.CompleteAdding();
            Task.WaitAll(workers);
            outputQueue.CompleteAdding();
            Console.WriteLine("Done.");

            Console.ReadLine();
        }

        void worker(int workerId)
        {
            Console.WriteLine("Worker {0} is starting.", workerId);

            foreach (var workItem in inputQueue.GetConsumingEnumerable())
            {
                Console.WriteLine("Worker {0} is processing item {1}", workerId, workItem);
                Thread.Sleep(100);          // Simulate work.
                outputQueue.Add(workItem);  // Output completed item.
            }

            Console.WriteLine("Worker {0} is stopping.", workerId);
        }

        void consumer()
        {
            Console.WriteLine("Consumer is starting.");

            foreach (var workItem in outputQueue.GetConsumingEnumerable())
            {
                Console.WriteLine("Consumer is using item {0}", workItem);
                Thread.Sleep(25);
            }

            Console.WriteLine("Consumer is finished.");
        }

        BlockingCollection<int> inputQueue = new BlockingCollection<int>();
        BlockingCollection<int> outputQueue = new BlockingCollection<int>();
    }
}

Plinq 和数据流

您还应该查看 Plinq,并且 - 如果您可以使用 .Net 4.5 - 还有Dataflow (Task Parallel Library)

于 2013-05-02T18:47:35.820 回答