3

我坚信通过重塑来学习。带着这种心态,我着手实现自定义线程池。我为自己设定的目标如下:

  1. 能够在线程池中对工作项进行排队。
  2. 能够处理具有固定数量线程的工作项——所有线程同时创建。
  3. 普通工作线程函数应该只知道如何 deque,而不应该处理其他函数/属性,如 IsEmpty 或 Count。

我成功地实现了上述目标,但想验证我与 stackoverflow 专家一起采用的方法。另外,想了解是否有更好的方法,或者多线程专家将如何解决这个问题。以下段落提到了我面临的挑战以及我是如何解决的。

我创建的线程池在内部维护了一个工作项队列,所有工作线程都从该队列中选择并处理它。每当一个新项目排队时,它都会发出一个事件信号,以便任何空闲线程都可以拾取并执行它。

我从 autoresetevent 开始向队列中任何新工作项的等待线程发出信号,但我遇到了丢失信号事件的问题。当多个项目排队并且没有空闲线程来处理该项目时,就会发生这种情况。未处理的项目总数与由于重叠集合(信号)事件而丢失的信号事件总数相同。

为了解决丢失信号事件的问题,我在 autoresetevent 之上创建了一个包装器,并使用它来代替 autoresetevent。它解决了这个问题。这是相同的代码清单:

public static class CustomThreadPool
{
    static CustomThreadPool()
    {
        for (int i = 0; i < minThreads; i++)
            _threads.Add(
                new Thread(ThreadFunc) { IsBackground = true }
                );

        _threads.ForEach((t) => t.Start());
    }

    public static void EnqueWork(Action action)
    {
        _concurrentQueue.Enqueue(action);
        _enqueEvent.Set();
    }

    private static void ThreadFunc()
    {
        Action action = null;
        while (true)
        {
            _enqueEvent.WaitOne();
            _concurrentQueue.TryDequeue(out action);
            action();
        }
    }

    private static ConcurrentQueue<Action> _concurrentQueue = new ConcurrentQueue<Action>();
    private static List<Thread> _threads = new List<Thread>();
    private static CountAutoResentEvent _enqueEvent = new CountAutoResentEvent();
    private static object _syncObject = new object();
    private const int minThreads = 4;
    private const int maxThreads = 10;

    public static void Test()
    {
        CustomThreadPool.EnqueWork(() => {

            for (int i = 0; i < 10; i++) Console.WriteLine(i);
            Console.WriteLine("****First*****");
        });

        CustomThreadPool.EnqueWork(() =>
        {
            for (int i = 0; i < 10; i++) Console.WriteLine(i);
            Console.WriteLine("****Second*****");
        });

        CustomThreadPool.EnqueWork(() =>
        {
            for (int i = 0; i < 10; i++) Console.WriteLine(i);
            Console.WriteLine("****Third*****");
        });

        CustomThreadPool.EnqueWork(() =>
        {
            for (int i = 0; i < 10; i++) Console.WriteLine(i);
            Console.WriteLine("****Fourth*****");
        });

        CustomThreadPool.EnqueWork(() =>
        {
            for (int i = 0; i < 10; i++) Console.WriteLine(i);
            Console.WriteLine("****Fifth*****");
        });
    }
}

public class CountAutoResentEvent
{
    public void Set()
    {
        _event.Set();
        lock (_sync)
            _countOfSet++;
    }

    public void WaitOne()
    {
        _event.WaitOne();
        lock (_sync)
        {
            _countOfSet--;
            if (_countOfSet > 0)
                _event.Set();
        }
    }

    private AutoResetEvent _event = new AutoResetEvent(false);
    private int _countOfSet = 0;
    private object _sync = new object();
}

现在,我有几个问题:

  1. 我的方法是否充分证明?
  2. 什么同步机制最适合这个问题,为什么?
  3. 多线程专家将如何处理这个问题?

谢谢。

4

1 回答 1

1

从我所看到的我会说这是正确的。我喜欢你已经使用ConcurrentQueue并且没有去实现你自己的同步队列。这是一团糟,很可能不会像现有的那样快。

我只想指出,您的自定义“信号机制”实际上与信号量非常相似:一种允许多个线程进入临界区的锁。这个功能已经存在于Semaphore类中。

于 2011-12-17T08:59:34.940 回答