1

我的目标是避免将线程池线程用于 CPU 密集型工作,从而避免 IIS 停止响应新请求的情况。

你能看到下面的代码有什么问题吗?这是一种安全/干净的方法吗?你能提供任何改进吗?

    private static ConcurrentQueue<Job> Jobs = new ConcurrentQueue<Job>();
    static int threadCount = 0;

    private void QueueJob(Job job)
    {

        lock(Jobs)
        {
            Jobs.Enqueue(job);
            if (threadCount == 0)
            {

                Interlocked.Increment(ref threadCount); 
                var t= new Thread(new ThreadStart(ConsumeQueue));              
                t.Start();

            }
        }



    }
    private void ConsumeQueue()
    {
        while (true)
        {
            lock (Jobs)
            { 
                if (!Jobs.Any())
                {
                    Interlocked.Decrement(ref threadCount);
                    return;
                }
            }

            Job j;

            var jobToDo = Jobs.TryDequeue(out j);

            if (jobToDo)
            {
                DoCPUBoundWork(j);
            }
        }

    }
4

2 回答 2

2

这是一个应该满足您需求的基本队列:

//sealed so we don't have to implement full IDisposable pattern
sealed class Q:IDisposable
{
    private CancellationTokenSource cts = new CancellationTokenSource();
    private BlockingCollection<Action> queue =
        new BlockingCollection<Action>(new ConcurrentQueue<Action>());

    public Q()
    {
        new Thread(() => RunQueue()).Start();
    }

    private void RunQueue()
    {
        while(!cts.IsCancellationRequested)
        {
            Action action;
            try
            {
                //lovely... blocks until something is available
                //so we don't need to deal with messy synchronization
                action = queue.Take(cts.Token); 
            }
            catch(OperationCanceledException)
            {
                break;
            }
            action();
        }
    }

    public void AddJob(Action action)
    {
        try
        {
            queue.Add(action,cts.Token);
        }
        catch(OperationCanceledException e)
        {
            throw new ObjectDisposedException("Q is disposed",e);
        }
    }

    public void Dispose()
    {
        if(!cts.IsCancellationRequested)
        {
            cts.Cancel();
        }
    }
}

按如下方式使用:

Q actionQueue=new Q();
actionQueue.AddJob(() => Console.WriteLine("action1"));
actionQueue.AddJob(() => Console.WriteLine("action2"));
actionQueue.AddJob(() => Console.WriteLine("action3"));
于 2012-12-11T15:32:15.407 回答
1

您的线程有可能在入队作业之前终止

lock (Jobs)
{ 
     if (!ResizeJobs.Any())
     {
         Interlocked.Decrement(ref threadCount);
         return;
     }
}

在此之后另一个作业将执行 Jobs.Enqueue(job);

我认为您不需要终止工作线程。它应该在睡眠状态下等待工作

于 2012-12-11T16:04:24.103 回答