2

这是我的问题here

通过阅读……我从信号量转移到了线程池。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace ThreadPoolTest
{
    class Data
    {
        public int Pos { get; set; }
        public int Num { get; set; }
    }

    class Program
    {
        static ManualResetEvent[] resetEvents = new ManualResetEvent[20];

        static void Main(string[] args)
        {            

            int s = 0;
            for (int i = 0; i < 100000; i++)
            {                
                resetEvents[s] = new ManualResetEvent(false);
                Data d = new Data();
                d.Pos = s;
                d.Num = i;
                ThreadPool.QueueUserWorkItem(new WaitCallback(Process), (object)d);
                if (s >= 19)
                {
                    WaitHandle.WaitAll(resetEvents);
                    Console.WriteLine("Press Enter to Move forward");
                    Console.ReadLine();
                    s = 0;
                }
                else
                {
                    s = s + 1;
                }
            }
        }

        private static void Process(object o)
        {
            Data d = (Data) o;
            Console.WriteLine(d.Num.ToString());
            Thread.Sleep(10000);
            resetEvents[d.Pos].Set();
        }
    }
}

这段代码有效,我可以处理 20 组。但由于 WaitAll,我不喜欢这段代码。所以假设我开始一批 20 个线程,3 个线程需要更长的时间,而 17 个线程已经完成。即使那样,由于 WaitAll 的原因,我仍将保持 17 个线程处于等待状态。

WaitAny 本来会很好……但是我必须构建很多控制结构(如堆栈、列表、队列等)才能有效地使用池,这似乎很麻烦。

我不喜欢的另一件事是 resetEvents 类中的整个全局变量。因为这个数组必须在 Process 方法和主循环之间共享。

上面的代码有效......但我需要你的帮助来改进它。

再次...我在 .NET 2.0 VS 2008 上。我不能使用 .NET 4.0 并行/异步框架。

4

2 回答 2

3

有几种方法可以做到这一点。根据您上面发布的内容,最简单的可能是:

const int MaxThreads = 4;
const int ItemsToProcess = 10000;
private Semaphore _sem = new Semaphore(MaxThreads, MaxThreads);

void DoTheWork()
{
    int s = 0;
    for (int i = 0; i < ItemsToProcess; ++i)
    {
        _sem.WaitOne();
        Data d = new Data();
        d.Pos = s;
        d.Num = i;
        ThreadPool.QueueUserWorkItem(Process, d);
        ++s;
        if (s >= 19)
            s = 0;
    }

    // All items have been assigned threads.
    // Now, acquire the semaphore "MaxThreads" times.
    // When counter reaches that number, we know all threads are done.
    int semCount = 0;
    while (semCount < MaxThreads)
    {
        _sem.WaitOne();
        ++semCount;
    }
    // All items are processed

    // Clear the semaphore for next time.
    _sem.Release(semCount);
}

void Process(object o)
{
    // do the processing ...

    // release the semaphore
    _sem.Release();
}

在我的示例中,我只使用了四个线程,因为这就是我拥有的内核数。当任何时候只有四个线程可以处理时,使用 20 个线程毫无意义。但是,如果您愿意,您可以随意增加MaxThreads数量。

于 2013-02-27T20:17:01.700 回答
2

所以我很确定这都是.NET 2.0。

我们将开始定义Action,因为我已经习惯了使用它。如果在 3.5+ 中使用此解决方案,请删除该定义。

接下来,我们根据输入创建一个动作队列。

之后我们定义一个回调;这个回调是方法的核心。

它首先抓取队列中的下一项(使用锁,因为队列不是线程安全的)。如果它最终有一个要抓取的项目,它会执行该项目。接下来,它将一个新项目添加到“自身”的线程池中。这是一种递归匿名方法(您不会经常遇到这种用法)。这意味着当第一次调用回调时,它将执行一个项目,然后安排一个将执行另一个项目的任务,该项目将安排一个执行另一个项目的任务,依此类推。最终队列将用完,他们将停止排队更多项目。

我们还希望该方法在我们全部完成之前一直阻塞,因此我们通过增加一个计数器来跟踪这些回调中有多少已经完成。当该计数器达到任务限制时,我们会发出事件信号。

最后,我们在线程池中启动 N 个这些回调。

public delegate void Action();
public static void Execute(IEnumerable<Action> actions, int maxConcurrentItems)
{
    object key = new object();
    Queue<Action> queue = new Queue<Action>(actions);
    int count = 0;
    AutoResetEvent whenDone = new AutoResetEvent(false);

    WaitCallback callback = null;
    callback = delegate
    {
        Action action = null;
        lock (key)
        {
            if (queue.Count > 0)
                action = queue.Dequeue();
        }
        if (action != null)
        {
            action();
            ThreadPool.QueueUserWorkItem(callback);
        }
        else
        {
            if (Interlocked.Increment(ref count) == maxConcurrentItems)
                whenDone.Set();
        }

    };

    for (int i = 0; i < maxConcurrentItems; i++)
    {
        ThreadPool.QueueUserWorkItem(callback);
    }

    whenDone.WaitOne();
}

这是另一个不使用线程池的选项,只使用固定数量的线程:

public static void Execute(IEnumerable<Action> actions, int maxConcurrentItems)
{
    Thread[] threads = new Thread[maxConcurrentItems];
    object key = new object();
    Queue<Action> queue = new Queue<Action>(actions);
    for (int i = 0; i < maxConcurrentItems; i++)
    {
        threads[i] = new Thread(new ThreadStart(delegate
        {
            Action action = null;
            do
            {
                lock (key)
                {
                    if (queue.Count > 0)
                        action = queue.Dequeue();
                    else 
                        action = null;
                }
                if (action != null)
                {
                    action();
                }
            } while (action != null);
        }));
        threads[i].Start();
    }

    for (int i = 0; i < maxConcurrentItems; i++)
    {
        threads[i].Join();
    }
}
于 2013-02-27T20:23:06.047 回答