我坚信通过重塑来学习。带着这种心态,我着手实现自定义线程池。我为自己设定的目标如下:
- 能够在线程池中对工作项进行排队。
- 能够处理具有固定数量线程的工作项——所有线程同时创建。
- 普通工作线程函数应该只知道如何 deque,而不应该处理其他函数/属性,如 IsEmpty 或 Count。
我成功地实现了上述目标,但想验证我与 stackoverflow 专家一起采用的方法。另外,想了解是否有更好的方法,或者多线程专家将如何解决这个问题。以下段落提到了我面临的挑战以及我是如何解决的。
我创建的线程池在内部维护了一个工作项队列,所有工作线程都从该队列中选择并处理它。每当一个新项目排队时,它都会发出一个事件信号,以便任何空闲线程都可以拾取并执行它。
我从 autoresetevent 开始向队列中任何新工作项的等待线程发出信号,但我遇到了丢失信号事件的问题。当多个项目排队并且没有空闲线程来处理该项目时,就会发生这种情况。未处理的项目总数与由于重叠集合(信号)事件而丢失的信号事件总数相同。
为了解决丢失信号事件的问题,我在 autoresetevent 之上创建了一个包装器,并使用它来代替 autoresetevent。它解决了这个问题。这是相同的代码清单:
public static class CustomThreadPool
{
static CustomThreadPool()
{
for (int i = 0; i < minThreads; i++)
_threads.Add(
new Thread(ThreadFunc) { IsBackground = true }
);
_threads.ForEach((t) => t.Start());
}
public static void EnqueWork(Action action)
{
_concurrentQueue.Enqueue(action);
_enqueEvent.Set();
}
private static void ThreadFunc()
{
Action action = null;
while (true)
{
_enqueEvent.WaitOne();
_concurrentQueue.TryDequeue(out action);
action();
}
}
private static ConcurrentQueue<Action> _concurrentQueue = new ConcurrentQueue<Action>();
private static List<Thread> _threads = new List<Thread>();
private static CountAutoResentEvent _enqueEvent = new CountAutoResentEvent();
private static object _syncObject = new object();
private const int minThreads = 4;
private const int maxThreads = 10;
public static void Test()
{
CustomThreadPool.EnqueWork(() => {
for (int i = 0; i < 10; i++) Console.WriteLine(i);
Console.WriteLine("****First*****");
});
CustomThreadPool.EnqueWork(() =>
{
for (int i = 0; i < 10; i++) Console.WriteLine(i);
Console.WriteLine("****Second*****");
});
CustomThreadPool.EnqueWork(() =>
{
for (int i = 0; i < 10; i++) Console.WriteLine(i);
Console.WriteLine("****Third*****");
});
CustomThreadPool.EnqueWork(() =>
{
for (int i = 0; i < 10; i++) Console.WriteLine(i);
Console.WriteLine("****Fourth*****");
});
CustomThreadPool.EnqueWork(() =>
{
for (int i = 0; i < 10; i++) Console.WriteLine(i);
Console.WriteLine("****Fifth*****");
});
}
}
public class CountAutoResentEvent
{
public void Set()
{
_event.Set();
lock (_sync)
_countOfSet++;
}
public void WaitOne()
{
_event.WaitOne();
lock (_sync)
{
_countOfSet--;
if (_countOfSet > 0)
_event.Set();
}
}
private AutoResetEvent _event = new AutoResetEvent(false);
private int _countOfSet = 0;
private object _sync = new object();
}
现在,我有几个问题:
- 我的方法是否充分证明?
- 什么同步机制最适合这个问题,为什么?
- 多线程专家将如何处理这个问题?
谢谢。