26

我正在开发一个控制台应用程序。

我想使用 Threadpool 来执行网络下载。这是一些假代码。

 for (int loop=0; loop< 100; loop++)
 {
     ThreadPool.QueueUserWorkItem(new WaitCallback(GetPage), pageList[loop]);
 }


snip

private static void GetPage(object o)
{
    //get the page
}

如何防止我的代码同时启动两个以上(或十个,或其他)线程?

我试过了

    ThreadPool.SetMaxThreads(1, 0);
    ThreadPool.SetMinThreads(1, 0);

但它们似乎没有影响。

4

6 回答 6

46

我会使用Parallel.ForMaxDegreeOfParallelism相应地设置。

Parallel.For(0, 1000, new ParallelOptions { MaxDegreeOfParallelism = 10 },
  i =>
  {
    GetPage(pageList[i]);
  });
于 2012-04-27T02:22:41.780 回答
23

只需从以下代码反转该代码:

ThreadPool.SetMaxThreads(1, 0);
ThreadPool.SetMinThreads(1, 0);

到:

ThreadPool.SetMinThreads(1, 0);
ThreadPool.SetMaxThreads(1, 0);

您不能将 MaxThread 设置为小于 MinThread

于 2015-11-10T11:33:06.143 回答
10

就个人而言,我会使用SmartThreadPool并单独保留 ThreadPool。但是,这可能是您想要的:C# thread pool limited threads

链接中包含的代码(请注明原作者,而不是我)

System.Threading.Semaphore S = new System.Threading.Semaphore(3, 3);  
try 
{     
  // wait your turn (decrement)     
  S.WaitOne();     
  // do your thing 
}  
finally 
{     
  // release so others can go (increment)     
  S.Release(); 
} 
于 2012-04-26T22:08:45.567 回答
5

描述

您可以使用该ThreadPool.SetMaxThreads方法执行此操作。

但是使用 ThreadPool for WebRequest 存在一些问题。例如,阅读这个(ThreadPool 或 HttpWebRequest 中的错误?)

样本

ThreadPool.SetMaxThreads(2,2);

编辑

我个人会为此使用 AsParallel from Linq

更多信息

于 2012-04-26T22:03:13.780 回答
3

查看ThreadPool.SetMaxThreads的参数。第一个参数是工作线程的数量,第二个参数是异步线程的数量,也就是你说的那个。

在文档的下方,它说:

您不能将工作线程数或 I/O 完成线程数设置为小于计算机中处理器数的数字。

听起来您正在尝试将 ThreadPool 用于不打算用于的事情。如果您想限制下载量,请创建一个为您管理的类,因为 ThreadPool 不一定是您问题的完整解决方案。

我建议在 ThreadPool 中启动两个线程并等待回调的类。当它接收到一个线程完成的回调时,会排队一个新的线程。

于 2012-04-26T22:55:07.390 回答
3

如果您对 .Net 2.0 很紧张,您可以使用以下技术:

知道您将任务排入队列的事实ThreadPool将创建一个新线程(当然,如果没有空闲线程),您将在执行此操作之前等待,直到有一个空闲线程。为此目的,BlockingCounter使用该类(如下所述),一旦达到限制,将等待增加,直到有人(另一个线程)减少它。然后它进入“关闭”状态,表示不会完成新的增量并等待完成。

以下示例显示最多 4 个任务,总数为 10。

class Program
{

    static int s_numCurrentThreads = 0;

    static Random s_rnd = new Random();

    static void Main(string[] args)
    {

        int maxParallelTasks = 4;
        int totalTasks = 10;

        using (BlockingCounter blockingCounter = new BlockingCounter(maxParallelTasks))
        {
            for (int i = 1; i <= totalTasks; i++)
            {

                Console.WriteLine("Submitting task {0}", i);
                blockingCounter.WaitableIncrement();
                if (!ThreadPool.QueueUserWorkItem((obj) =>
                                                      {
                                                          try
                                                          {
                                                              ThreadProc(obj);
                                                          }
                                                          catch (Exception ex)
                                                          {
                                                              Console.Error.WriteLine("Task {0} failed: {1}", obj, ex.Message);
                                                          }
                                                          finally
                                                          {
                                                              // Exceptions are possible here too, 
                                                              // but proper error handling is not the goal of this sample
                                                              blockingCounter.WaitableDecrement();
                                                          }
                                                      }, i))
                {
                    blockingCounter.WaitableDecrement();
                    Console.Error.WriteLine("Failed to submit task {0} for execution.", i);
                }
            }

            Console.WriteLine("Waiting for copmletion...");
            blockingCounter.CloseAndWait(30000);
        }

        Console.WriteLine("Work done!");
        Console.ReadKey();

    }

    static void ThreadProc (object obj)
    {
        int taskNumber = (int) obj;
        int numThreads = Interlocked.Increment(ref s_numCurrentThreads);

        Console.WriteLine("Task {0} started. Total: {1}", taskNumber, numThreads);
        int sleepTime = s_rnd.Next(0, 5);
        Thread.Sleep(sleepTime * 1000);
        Console.WriteLine("Task {0} finished.", taskNumber);

        Interlocked.Decrement(ref s_numCurrentThreads);
    }

它使用基于此处发布的 Marc Gravell 的 SizeQueue 的 BlockingCounter 类,但没有计数器而不是队列。当您结束排队新线程时,调用 Close() 方法,然后等待它完成。

public class BlockingCounter : IDisposable
{
    private int m_Count;
    private object m_counterLock = new object();

    private bool m_isClosed = false;
    private volatile bool m_isDisposed = false;

    private int m_MaxSize = 0;

    private ManualResetEvent m_Finished = new ManualResetEvent(false);

    public BlockingCounter(int maxSize = 0)
    {
        if (maxSize < 0)
            throw new ArgumentOutOfRangeException("maxSize");
        m_MaxSize = maxSize;
    }


    public void WaitableIncrement(int timeoutMs = Timeout.Infinite)
    {
        lock (m_counterLock)
        {
            while (m_MaxSize > 0 && m_Count >= m_MaxSize)
            {
                CheckClosedOrDisposed();
                if (!Monitor.Wait(m_counterLock, timeoutMs))
                    throw new TimeoutException("Failed to wait for counter to decrement.");
            }

            CheckClosedOrDisposed();
            m_Count++;

            if (m_Count == 1)
            {
                Monitor.PulseAll(m_counterLock);
            }

        }
    }

    public void WaitableDecrement(int timeoutMs = Timeout.Infinite)
    {
        lock (m_counterLock)
        {
            try
            {
                while (m_Count == 0)
                {
                    CheckClosedOrDisposed();
                    if (!Monitor.Wait(m_counterLock, timeoutMs))
                        throw new TimeoutException("Failed to wait for counter to increment.");
                }

                CheckDisposed();

                m_Count--;

                if (m_MaxSize == 0 || m_Count == m_MaxSize - 1)
                    Monitor.PulseAll(m_counterLock);
            }
            finally
            {
                if (m_isClosed && m_Count == 0)
                    m_Finished.Set();
            }
        }
    }

    void CheckClosedOrDisposed()
    {
        if (m_isClosed)
            throw new Exception("The counter is closed");
        CheckDisposed();
    }

    void CheckDisposed()
    {
        if (m_isDisposed)
            throw new ObjectDisposedException("The counter has been disposed.");
    }

    public void Close()
    {
        lock (m_counterLock)
        {
            CheckDisposed();
            m_isClosed = true;
            Monitor.PulseAll(m_counterLock);
        }
    }

    public bool WaitForFinish(int timeoutMs = Timeout.Infinite)
    {
        CheckDisposed();
        lock (m_counterLock)
        { 
             if (m_Count == 0)
                 return true;
        }
        return m_Finished.WaitOne(timeoutMs);
    }

    public void CloseAndWait (int timeoutMs = Timeout.Infinite)
    {
        Close();
        WaitForFinish(timeoutMs);
    }

    public void Dispose()
    {
        if (!m_isDisposed)
        {
            m_isDisposed = true;
            lock (m_counterLock)
            {
                // Wake up all waiting threads, so that they know the object 
                // is disposed and there's nothing to wait anymore
                Monitor.PulseAll(m_counterLock);
            }
            m_Finished.Close();
        }
    }
}

结果将是这样的:

Submitting task 1
Submitting task 2
Submitting task 3
Submitting task 4
Submitting task 5
Task 1 started. Total: 1
Task 1 finished.
Task 3 started. Total: 1
Submitting task 6
Task 2 started. Total: 2
Task 3 finished.
Task 6 started. Total: 4
Task 5 started. Total: 3
Task 4 started. Total: 4
Submitting task 7
Task 4 finished.
Submitting task 8
Task 7 started. Total: 4
Task 5 finished.
Submitting task 9
Task 7 finished.
Task 8 started. Total: 4
Task 9 started. Total: 4
Submitting task 10
Task 2 finished.
Waiting for copmletion...
Task 10 started. Total: 4
Task 10 finished.
Task 6 finished.
Task 8 finished.
Task 9 finished.
Work done!
于 2013-09-16T21:40:33.333 回答