8
  • 我有一个 I/O 密集型操作。
  • 我只想要一次运行最多 5 个线程。
  • 我有 8000 个任务要排队和完成。
  • 每个任务大约需要 15-20 秒来执行。

我环顾四周 ThreadPool,但是

        ThreadPool.SetMaxThreads(5, 0);

        List<task> tasks = GetTasks();

        int toProcess = tasks.Count;
        ManualResetEvent resetEvent = new ManualResetEvent(false);

        for (int i = 0; i < tasks.Count; i++)
        {
            ReportGenerator worker = new ReportGenerator(tasks[i].Code, id);
            ThreadPool.QueueUserWorkItem(x =>
            {
                worker.Go();
                if (Interlocked.Decrement(ref toProcess) == 0)
                    resetEvent.Set();
            });
        }

        resetEvent.WaitOne();

我不知道为什么......我的代码一次执行超过 5 个线程。我试过 setmaxthreads、setminthreads,但它一直在执行超过 5 个线程。

怎么了?我错过了什么?我应该以另一种方式这样做吗?

谢谢

4

5 回答 5

7

任务并行库可以帮助您:

List<task> tasks = GetTasks();

Parallel.ForEach(tasks, new ParallelOptions { MaxDegreeOfParallelism = 5 }, 
  task => {ReportGenerator worker = new ReportGenerator(task.Code, id); 
           worker.Go();});

MaxDegreeOfParallelism 有什么作用?

于 2012-07-15T03:28:13.637 回答
7

SetMaxThreads 有一个限制,您不能将其设置为低于系统上的处理器数量。如果您有 8 个处理器,则将其设置为 5 与根本不调用该函数相同。

于 2013-11-06T15:01:34.383 回答
1

我认为有一种不同的更好的方法来解决这个问题。(如果我不小心对某些语法进行了 Java 化,请原谅我)

这里的主线程在“任务”中有一个要做的事情列表——而不是为每个任务创建线程,当你有这么多项目时,这真的效率不高,而是创建所需数量的线程,然后让它们从根据需要列表。

要做的第一件事是向该代码所来自的类添加一个变量,用作指向列表的指针。我们还将为所需的最大线程数添加一个。

// New variable in your class definition
private int taskStackPointer;
private final static int MAX_THREADS = 5;

创建一个方法,该方法返回列表中的下一个任务并递增堆栈指针。然后为此创建一个新接口:

// Make sure that only one thread has access at a time
[MethodImpl(MethodImplOptions.Synchronized)] 
public task getNextTask()
{
    if( taskStackPointer < tasks.Count )
        return tasks[taskStackPointer++];
    else
        return null;
}

或者,您可以返回 tasks[taskStackPointer++].code,如果有可以指定为“列表结尾”的值。但是,这样做可能更容易。

界面:

public interface TaskDispatcher
{
     [MethodImpl(MethodImplOptions.Synchronized)] public task getNextTask();
}

在 ReportGenerator 类中,更改构造函数以接受调度程序对象:

public ReportGenerator( TaskDispatcher td, int idCode )
{
    ...
}

您还需要更改ReportGenerator类,以便处理有一个外部循环,该循环通过调用td.getNextTask()来请求新任务,并在返回 NULL 时退出循环。

最后,把线程创建代码改成这样:(这只是给你一个想法)

taskStackPointer = 0;
for (int i = 0; i < MAX_THREADS; i++) 
{ 
    ReportGenerator worker = new ReportGenerator(this,id);
    worker.Go(); 
} 

这样您就可以创建所需数量的线程并让它们都以最大容量工作。

(我不确定我是否完全正确地使用了“[MethodImpl(MethodImplOptions.Synchronized)]”...我比 C# 更习惯 Java)

于 2012-07-15T03:25:41.440 回答
1

您的任务列表中将包含 8k 项,因为您告诉代码将它们放在那里:

List<task> tasks = GetTasks();

也就是说,这个数字与使用了多少线程无关,因为调试器总是会显示您添加到列表中的项目数量。

有多种方法可以确定正在使用的线程数。也许最简单的方法之一是使用调试器进入应用程序并查看线程窗口。你不仅会得到一个计数,而且你会看到每个线程在做什么(或不做什么),这导致我......

关于您的任务正在做什么以及您如何获得一个数字来“限制”线程池,需要进行重要的讨论。在大多数用例中,线程池会做正确的事情。

现在回答你的具体问题......

要显式控制并发任务的数量,请考虑一个简单的实现,该实现将涉及将任务集合从 List 更改为 BlockingCollection(将在内部使用 ConcurrentQueue)和以下代码来“使用”工作:

var parallelOptions = new ParallelOptions
{
    MaxDegreeOfParallelism = 5
};

Parallel.ForEach(collection.GetConsumingEnumerable(), options, x =>
{
    // Do work here...
});

将 MaxDegreeOfParallelism 更改为您确定适合您正在做的工作的任何并发值。

您可能会感兴趣以下内容:

Parallel.ForEach 方法

阻塞集合

克里斯

于 2012-07-15T03:55:53.263 回答
1

它对我有用。这样你就不能使用小于“minworkerThreads”的工作线程数。问题是如果您最多需要五个“workerthreads”而“minworkerThreads”是六个则不起作用。{

ThreadPool.GetMinThreads(out minworkerThreads,out minportThreads);
ThreadPool.SetMaxThreads(minworkerThreads, minportThreads);

}

MSDN

评论

您不能将工作线程或 I/O 完成线程的最大数量设置为小于计算机上处​​理器数量的数字。要确定存在多少处理器,请检索 Environment.ProcessorCount 属性的值。此外,您不能将工作线程或 I/O 完成线程的最大数量设置为小于相应的工作线程或 I/O 完成线程的最小数量。要确定最小线程池大小,请调用 GetMinThreads 方法。

如果公共语言运行时由 Internet 信息服务 (IIS) 或 SQL Server 托管,则主机可以限制或阻止对线程池大小的更改。

更改线程池中的最大线程数时要小心。虽然您的代码可能会受益,但更改可能会对您使用的代码库产生不利影响。

将线程池大小设置得太大会导致性能问题。如果同时执行的线程太多,任务切换开销就成为一个重要因素。

于 2020-12-03T19:15:21.817 回答