0

我们在生产服务器中有一个作业服务,它在计时器的帮助下在 2 分钟的间隔内触发。
该服务从数据库中收集 20 个任务,然后创建 20 个 URL(同一主机,不同参数);使用线程池线程并行触发它们,并等待响应。
URL 是环回的。即,目标网站也托管在同一服务器中。

工艺流程如下:

1. If task is simple, URL response will get back to job service within seconds. 
   But job service has to wait for another 2 minutes to pick next 20 jobs. 
   CPU will be idle here. 
   How we can efficiently utilize this CPU idle time for processing more jobs…?

2. If task is long running, job service will wait max 2 minutes 
   for the response, if not receiving a response; service will 
   pick next 20 jobs to process. As a result jobs will be queued up 
   and CPU usage will go very high. 
   How do we prevent such a situation….?


除了这个计时器和定期挑选工作之外,我们还有其他有效的方法来处理工作吗..?
就像监控 IIS 工作进程和 CPU 使用率一样,基于该选择作业并处理它们……
如果是这样,我们如何使用 C# 代码监控 IIS 工作进程和 CPU 使用率……?

或任何其他想法......谢谢。

更新:
用于创建并行线程的代码片段:

public class ParallelProcess
  {
    #region "Instance constructor"
    public ParallelProcess()
    {
      //Handle unhandled execeptions, throws from threads.
      AppDomain.CurrentDomain.UnhandledException += OnUnhandledException;
    }
    #endregion

#region "Private Fields"

#region "Static"
//Number of threads to be activated to process each job request.
private static int MaxConcurrentThreads = 0;
//Minimum number of idle threads that should exist in the pool.
private static int ThreadBuffer = 0;
//Time out in ms for the theadpool to wait on all threads to complete.
private static int ThreadWaitTimeOut = 0;
//Available Worker Threads in thread pool
private static int workerThreads = 0;
//Available Port Threads in thread pool
private static int completionThreads = 0;
//Minimum Worker Threads available in ThreadPool
private static int minW = 0;
//Minimum Port Threads available in ThreadPool
#endregion
private static int minC = 0;

#endregion
 #region "static constructor"
    static ParallelProcess()
    {
      try
      {
        //Required threads defined in C:\HobooAppServer_Config\hobooAppServer_config.xml
        MaxConcurrentThreads = Configuration.RequiredThread;
        ThreadBuffer =Configuration.ThreadBuffer;
        //In milliseconds
        ThreadWaitTimeOut =Configuration.TRWaitTime;
         //In milliseconds

         //Get min number of threads from thread pool.
         ThreadPool.GetMinThreads(minW, minC);

         //set the thead limit to spawn in the current thread pool.
         if (minW >= MaxConcurrentThreads)
         {
          ThreadPool.SetMinThreads(MaxConcurrentThreads, minC);
          minW = MaxConcurrentThreads;
         }
        else
        {
          ThreadPool.SetMinThreads(minW, minC);
        }

        }
      catch (Exception ex)
      {
        //Write exception to log file.
        WriteJobLog(new JobLogDTO
        {
          Mode = "Parallel",
          UniqueId = "Thread Pool Exception",
          ThreadId = Thread.CurrentThread.ManagedThreadId.ToString(),
          StartTime = DateTime.Now.ToString(),
          ExceptionOrResult = ex.ToString()
        });

      }
    }
    #endregion


#region "public methods"
/// <summary>
/// Gets the count of rows to be retrieved from job.
/// This takes into account the current work load and available threads to process.
/// </summary>
/// <returns>int (Number of threads currently available)</returns>
private int GetMaxItemsRetrievalCount()
{
  int rtnVal = 1;
  try
  {
    //Get Available idle threads currently in the thead pool.
    ThreadPool.GetAvailableThreads(workerThreads, completionThreads);
    rtnVal = workerThreads > MaxConcurrentThreads ? MaxConcurrentThreads : workerThreads;
    rtnVal = rtnVal > 0 ? rtnVal : 0;
  }
  catch (Exception ex)
  {
    //Write exceptions to log file.
    WriteJobLog(new JobLogDTO
    {
      Mode = "Parallel",
      UniqueId = "GetMaxItemsRetrievalCount Exception",
      ThreadId = Thread.CurrentThread.ManagedThreadId.ToString(),
      StartTime = DateTime.Now.ToString(),
      ExceptionOrResult = ex.ToString()
    });
  }
  return rtnVal;
}

/// <summary>
/// The method which processes jobs on worker threads.
/// </summary>
public void ProcessBatchJobs(bool pIsNight, bool plPriority, string pUniqueId)
{
  bool isContinue = true;
  int maxRecordCount = 0;
  ManualResetEvent[] signalEvents = null;

  do
  {
    maxRecordCount = GetMaxItemsRetrievalCount();
    if (maxRecordCount > 0)
    {
      //Pick jobs from database
      List<SnapShotTask> Jobs =Business.Rtds.SnapShot.PickTasks(pIsNight, plPriority);
      if (Jobs != null && Jobs.Count > 0)
      {
        //Log Header-Thread Pool Information And Statistics - In Parallel Threads
        WriteJobLog(new JobLogDTO
        {
          Mode = "Parallel",
          UniqueId = pUniqueId,
          ThreadId = Thread.CurrentThread.ManagedThreadId.ToString(),
          StartTime = DateTime.Now.ToString(),
          AvblWorkerThread = workerThreads.ToString(),
          AvblPortThread = completionThreads.ToString(),
          AcqrdWorkerThread = minW.ToString(),
          AcqurdPortThread = minC.ToString(),
          JobsToProcess = Jobs.Count.ToString()
        });

        signalEvents = new ManualResetEvent[Jobs.Count];
        int signalCount = 0;

        //Loop through each job, create call back function, add items to queue, fire them
        foreach (SnapShotTask job in Jobs)
        {
          signalEvents(signalCount) = new ManualResetEvent(false);
          BatchCallBack threadPoolCallBack = 
           new BatchCallBack(job, signalEvents(signalCount));
          bool nResult = 
           ThreadPool.QueueUserWorkItem(
           new WaitCallback(threadPoolCallBack.ThreadPoolCallback),
           new BatchThreadData
          {
            IsNight = pIsNight,
            UniqueId = pUniqueId
          });
          signalCount += 1;
        }

        //Wait here untill child threads finish the job or timeout meets.
        bool result = WaitHandle.WaitAll(signalEvents, ParallelProcess.ThreadWaitTimeOut);

        //When one or more threads have not set the signal.
        if (result == false)
        {
          //Logger.Write("Not all threads completed in the pool. 
          //The pool exited due to time-out.");
        }
        else
        {
          //Logging time taken per batch.
          //Logger.Write(string.Format("Average Time taken for each batch of {1} orders : {0} ms",
          //New TimeSpan((ticksEnd - ticksStart)).TotalMilliseconds.ToString());
        }
        //ticksEnd = DateTime.Now.Ticks
      }
      else
      {
        //TODO : Retry Logic
        //Nothing to process.
        isContinue = false;
      }
    }
    else
    {
      //We did not get a thread to execute. So wait for a free thread(s).
      Thread.Sleep(1000);
    }
  } while (isContinue);

  //end time after batch is done.
  //endTime = DateTime.Now
  //Log the over-all time taken.
}
/// <summary>
/// Log unhandled exceptions from this application domain if any.
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private void OnUnhandledException(object sender, UnhandledExceptionEventArgs e)
{
  Exception ex = e.ExceptionObject as Exception;
  //Write unhandled exception to log file.
  WriteJobLog(new JobLogDTO
  {
    Mode = "Parallel",
    UniqueId = "UnhandledException",
    ThreadId = Thread.CurrentThread.ManagedThreadId.ToString(),
    StartTime = DateTime.Now.ToString(),
    ExceptionOrResult = ex.ToString()
  });
}
#endregion
}
4

1 回答 1

1

您不必监视 CPU 负载,因为操作系统会为您执行此操作。它将根据线程的优先级调度线程。

将任务的线程设置为较低的优先级。这样做,这些任务将被延迟,直到 CPU 有空闲来执行不太重要的线程。

像这样设置优先级:

Thread.CurrentThread.ThreadPriority = ThreadPriority.BelowNormal; // Or ThreadPriority.Lowest.
于 2013-05-03T08:38:27.687 回答