0

我的系统中有很多长期运行但不受 CPU 限制的“工作”。我想设置一个 Worker 角色来处理这些,但它们具有足够的可扩展性,一个 Worker 角色可以轻松地让 10-20 个线程同时处理一个“作业”。

这里有一些问题建议使用 TPL,我确实有一些有限的经验。然而,我不明白的是如何管理线程以便有最大数量的线程,以及如何在释放线程时调度它们。

稍微复杂的是,我想使用 Ninject 来创建每个“工作”所需的服务。

这是我在脑海中想象它的方式:

while (true)
{
    // Don't go unless we have a free slot (how do I implement this?!)
    if (FreeThreadExists)
    {
        // Get the next message
        CloudQueueMessage ThisMessage = Queue.GetMessage(TimeSpan.FromMinutes(3));

        // Get the new job and inject services
        Job MyJob = Kernel.Get<Job>();

        // Start this job
        // Will I need to keep ahold of this Task?
        // And how do I know when it's done so that FreeThreadExists changes?
        Task.Factory.StartNew(() => MyJob.Run(ThisMessage));
    }
    else
    {    
        // Sleep to prevent choking
        Thread.Sleep(500);
    }
}

然后在该线程中删除完成后的消息。基本上,我试图将一个 Worker 拆分为 20 个“实例”,而不会丢失太多 Azure 功能(特别是我想要队列消息超时/重试功能)。

我在.NET线程方面相当缺乏经验,最好的方法是什么?

编辑:哇,我完全忘了补充一点:这需要跨多个工人进行扩展。因此,10 个 Worker 角色,每个角色有 10 个线程,消息由 UI 前端排队,然后由第一个具有空闲线程的 Worker 出列并运行。

4

2 回答 2

1

试试这个伪代码:

while (true)
{
    int maxThreadsPerWorkerRole = 3;//assuming each worker role can handle 3 jobs simultaneously
    var messages = Queue.GetMessages(3);//Get 3 messages from the queue
    if (messages != null && messages.Count > 0)//Ensuring there is some work which needs to be done
    {
        var myTasks = new List<Task>();
        for (int i=0; i<messages.Count; i++)
        {
            Job MyJob = Kernel.Get<Job>();//Get the job
            var task = Task.Factory.StartNew(() => MyJob.Run(messages[i])); 
            myTasks.Add(task);
        }
        Task.WaitAll(myTasks.ToArray());//Wait for all tasks to complete.
        for (int i=0; i<messages.Count; i++)
        {
            //Write code to delete the message.
        }
        //Check if the queue is empty or not. If the queue is not empty, then repeat this loop
        //Otherwise simply exit this loop.
        if (Queue.RetrieveApproximateMessageCount() == 0)
        {
            break;
        }
    }
}

希望这可以帮助。

于 2012-10-23T15:56:52.473 回答
1

正如您在代码中提到的,我通常还涉及 TPL。作为 Gaurav 方法的替代方法,请参见下面的代码。下面的伪代码使用 Parallel.For,它控制线程的创建。每个线程启动并运行一个无限循环;如果没有工作,请确保您睡一会儿。

// Start 10 threads
Parallel.For(0, 10, (i) =>
{
  while (true)
  {
    // Get message from queue
    var msg = Queue.GetMessage();
    if (msg != null)
    {
      // Do some work here...
      StartSomeJob();

      // Then when you are done, delete the message
      Queue.DeleteMessage(msg);
    }
    // Wait 1 second before fetching next work item from queue
    System.Threading.Thread.Sleep(1000);
  }
});
于 2012-10-23T23:20:19.447 回答