1

我从这里的 C# 代码示例开始。我试图调整它有几个原因:1)在我的场景中,所有任务将在消费者开始之前预先放入队列中,2)我想将工作人员抽象到一个单独的类中,而不是让类中的原始Thread成员WorkerQueue

我的队列似乎并没有自行处理,它只是挂起,当我在 Visual Studio 中中断时,它卡在#1的_th.Join()线上。WorkerThread另外,有没有更好的方法来组织这个?关于暴露WaitOne()andJoin()方法的一些事情似乎是错误的,但我想不出一种合适的方式让WorkerThread队列与队列交互。

另外,顺便说一句 - 如果我q.Start(#)在块的顶部调用,则using只有一些线程每次启动(例如线程 1、2 和 8 处理每个任务)。为什么是这样?这是某种竞争条件,还是我做错了什么?

using System;
using System.Collections.Generic;
using System.Text;
using System.Messaging;
using System.Threading;
using System.Linq;

namespace QueueTest
{
    class Program
    {
        static void Main(string[] args)
        {
            using (WorkQueue q = new WorkQueue())
            {
                q.Finished += new Action(delegate { Console.WriteLine("All jobs finished"); });

                Random r = new Random();
                foreach (int i in Enumerable.Range(1, 10))
                    q.Enqueue(r.Next(100, 500));

                Console.WriteLine("All jobs queued");
                q.Start(8);
            }
        }
    }

    class WorkQueue : IDisposable
    {
        private Queue<int> _jobs = new Queue<int>();
        private int _job_count;
        private EventWaitHandle _wh = new AutoResetEvent(false);
        private object _lock = new object();
        private List<WorkerThread> _th;
        public event Action Finished;

        public WorkQueue()
        {
        }

        public void Start(int num_threads)
        {
            _job_count = _jobs.Count;
            _th = new List<WorkerThread>(num_threads);
            foreach (int i in Enumerable.Range(1, num_threads))
            {
                _th.Add(new WorkerThread(i, this));
                _th[_th.Count - 1].JobFinished += new Action<int>(WorkQueue_JobFinished);
            }
        }

        void WorkQueue_JobFinished(int obj)
        {
            lock (_lock)
            {
                _job_count--;
                if (_job_count == 0 && Finished != null)
                    Finished();
            }
        }

        public void Enqueue(int job)
        {
            lock (_lock)
                _jobs.Enqueue(job);

            _wh.Set();
        }

        public void Dispose()
        {
            Enqueue(Int32.MinValue);
            _th.ForEach(th => th.Join());
            _wh.Close();
        }

        public int GetNextJob()
        {
            lock (_lock)
            {
                if (_jobs.Count > 0)
                    return _jobs.Dequeue();
                else
                    return Int32.MinValue;
            }
        }

        public void WaitOne()
        {
            _wh.WaitOne();
        }
    }

    class WorkerThread
    {
        private Thread _th;
        private WorkQueue _q;
        private int _i;

        public event Action<int> JobFinished;

        public WorkerThread(int i, WorkQueue q)
        {
            _i = i;
            _q = q;
            _th = new Thread(DoWork);
            _th.Start();
        }

        public void Join()
        {
            _th.Join();
        }

        private void DoWork()
        {
            while (true)
            {
                int job = _q.GetNextJob();
                if (job != Int32.MinValue)
                {
                    Console.WriteLine("Thread {0} Got job {1}", _i, job);
                    Thread.Sleep(job * 10); // in reality would to actual work here
                    if (JobFinished != null)
                        JobFinished(job);
                }
                else
                {
                    Console.WriteLine("Thread {0} no job available", _i);
                    _q.WaitOne();
                }
            }
        }
    }
}
4

4 回答 4

5

工作线程都阻塞在 DoWork() 中的 _q.WaitOne() 调用上。调用线程的 Join() 方法会死锁,线程永远不会退出。您需要添加一种机制来向工作线程发出信号以退出。一个 ManualResetEvent,在 worker 中使用 WaitAny 测试,将完成工作。

一个调试技巧:熟悉 Debug + Windows + Threads 窗口。它允许您在线程之间切换并查看它们的调用堆栈。你自己很快就会发现这个问题。

于 2010-06-02T19:50:41.707 回答
1

您的主要问题是其他答案中描述的确定性死锁。

但是,处理它的正确方法不是修复死锁,而是完全消除 Event。

Producer-Consumer 模型的整体思想是客户端同时加入 En-queue 和 De-queue 元素,这就是需要同步机制的原因。如果您事先将所有元素入队,然后仅同时出队,则只需要锁定出队,因为“事件”用于让“消费者”等待新元素入队;这不会发生在您的情况下(根据您的描述)。

此外,“单一职责”设计原则建议线程代码应该与“阻塞队列”代码分开。使“阻塞队列”成为自己的类,然后在您的线程管理类中使用它。

于 2010-06-02T20:01:18.303 回答
1

WaitOne()在结束时执行 a ,DoWork但在线程开始运行后您从未设置它。
请注意,AutoResetEvent“成功”后将返回未设置状态WaitOne

于 2010-06-02T19:52:22.737 回答
1

DoWork 方法中的循环永远不会结束。这将导致线程总是很忙,并且这个 thread.Join() 将永远阻塞,等待它完成。

你有一个 WaitOne,但我认为没有必要,除非你有理由希望你的线程池在你的工作完成后继续存在:

    private void DoWork()
    {
        bool done = false;
        while (!done)
        {
            int job = _q.GetNextJob();
            if (job != Int32.MinValue)
            {
                Console.WriteLine("Thread {0} Got job {1}", _i, job);
                Thread.Sleep(job * 10); // in reality would to actual work here
                if (JobFinished != null)
                    JobFinished(job);
            }
            else
            {
                Console.WriteLine("Thread {0} no job available", _i);
                done = true;
            }
        }
    }

如果您希望线程保持不变,以便在调用 WorkQueue.Start 时不必重新分配更多线程,则必须使用 AutoResetEvent 做一些更精细的事情。

于 2010-06-02T19:56:11.780 回答