6

我认为这是一个非常常见的线程场景:

  • 我有 100 个相同的工作要完成
  • 所有工作都是相互独立的
  • 我想一次最多处理 15 个作业
  • 随着每个作业的完成,将开始一个新作业,直到所有作业都完成

如果您假设每个作业在完成时都会触发一个事件(我正在使用 BackgroundWorker 类),我可以想出几种方法来解决这个问题,但我不确定“正确”的解决方案是什么。我希望你们中的一些大师能指出我正确的方向。

解决方案 1: 有一段时间(继续){ Threading.Sleep(1000); 在我的 Main( )函数中循环。Job_Completed 事件处理程序中的代码将设置 continue = false 当A)没有待排队的作业和B)所有排队的作业都已完成。我之前使用过这个解决方案,虽然它似乎工作正常......对我来说似乎有点“奇怪”。

解决方案 2: 在我的 Main() 函数中使用 Application.Run()。类似地,Job_Completed 事件处理程序中的代码将调用 Application.Exit() 当A)没有作业要排队和B)所有排队的作业都已完成。

解决方案 3: 使用 ThreadPool,将所有 500-1000 个请求排队,让它们一次运行 10 个(SetMaxThreads)并以某种方式等待它们全部完成。

在所有这些解决方案中,基本思想是每次完成另一个工作时都会启动一个新工作,直到没有工作为止。因此,问题不仅在于等待现有作业完成,还在于等待不再有任何待处理的作业开始。如果 ThreadPool 是正确的解决方案,那么在 ThreadPool 上等待完成所有排队项目的正确方法是什么?

我认为我在这里最重要的困惑是我不完全了解事件是如何从我的 Main() 函数中触发的。显然他们这样做了,我只是从 Windows 消息循环的角度不了解它的机制。解决这个问题的正确方法是什么,为什么?

4

8 回答 8

3

即使如果您想要另一个选择(您永远不会有足够的选择),其他答案也很好,那么作为一个想法怎么样。

只需将每个作业的数据放入一个结构中,该结构位于 FIFO 堆栈中。

创建 15 个线程。

每个线程将从堆栈中获取下一个作业,并将其弹出。

当一个线程完成处理时,获取下一个作业,如果堆栈为空,则线程死亡或只是休眠,等待。

唯一很容易解决的复杂性是将弹出放在关键部分(同步读取/弹出)。

于 2009-04-28T02:56:19.950 回答
2

回复:“不知何故等待他们全部完成”

ManualResetEvent是你的朋友,在你开始大批量创建这些小狗之前,在你的主线程中等待它,在工作完成后在后台操作结束时设置它。

另一种选择是手动创建线程并执行 foreach 线程 thread.Join()

你可以使用这个(我在测试期间使用这个)

     private void Repeat(int times, int asyncThreads, Action action, Action done) {
        if (asyncThreads > 0) {

            var threads = new List<Thread>();

            for (int i = 0; i < asyncThreads; i++) {

                int iterations = times / asyncThreads; 
                if (i == 0) {
                    iterations += times % asyncThreads;                    
                }

                Thread thread = new Thread(new ThreadStart(() => Repeat(iterations, 0, action, null)));
                thread.Start();
                threads.Add(thread);
            }

            foreach (var thread in threads) {
                thread.Join();
            }

        } else {
            for (int i = 0; i < times; i++) {
                action();
            }
        }
        if (done != null) {
            done();
        }
    }

用法:

// Do something 100 times in 15 background threads, wait for them all to finish.
Repeat(100, 15, DoSomething, null)
于 2009-04-28T01:09:52.810 回答
1

我只会使用任务并行库。

您可以将其作为一个简单的 Parallel.For 循环与您的任务一起执行,它会自动相当干净地管理它。如果您等不及 C# 4 和 Microsoft 的实现,一个临时的解决方法是只编译和使用TPL 的 Mono 实现。(我个人更喜欢 MS 实现,尤其是较新的 beta 版本,但 Mono 是今天功能性和可再分发的。)

于 2009-04-28T01:09:58.113 回答
1

当你在线程队列中排队一个工作项时,你应该得到一个等待句柄。将它们全部放在一个数组中,您可以将其作为参数传递给WaitAll()函数。

于 2009-04-28T01:10:07.503 回答
1

我会使用线程池。

在开始运行作业之前,请创建一个ManualResetEvent和一个 int 计数器。将每个作业添加到 ThreadPool,每次递增计数器。

在每个作业结束时,递减计数器,当它达到零时,对事件调用Set()

在您的主线程中,调用WaitOne()以等待所有作业完成。

于 2009-04-28T01:25:35.033 回答
0

这是我将如何处理它的伪代码(这不利用线程池,所以有人可能有更好的答案:)

main
{
    create queue of 100 jobs
    create new array of 15 threads
    start threads, passing each the job queue
    do whatever until threads are done
}

thread(queue)
{
    while(queue isn't empty)
    {
        lock(queue) { if queue still isn't empty dequeue a thing }
        process the thing
    }

    queue is empty so exit thread
}

编辑:如果您的问题是如何判断线程何时完成,并且您使用的是普通 C# 线程(不是 ThreadPooled 线程),您可以在每个线程上调用 Thread.Join() 并使用可选超时,它只会返回一次线程完成。如果你想在不挂断的情况下跟踪完成了多少线程,你可以像这样循环它们:

for(int i = 0; allThreads.Count > 0; i++)
{
    var thisThread = allThreads[i % threads.Count];
    if(thisThread.Join(timeout)) // something low, maybe 100 ms or something
        allThreads.Remove(thisThread);
}
于 2009-04-28T01:07:09.977 回答
0

ThreadPool可能是要走的路。该SetMaxThreads方法将能够限制正在执行的线程数。但是,这限制了进程/AppDomain 的最大线程数。SetMaxThreads如果进程作为服务运行,我不建议使用。

private static ManualResetEvent manual = new ManualResetEvent(false);
private static int count = 0;

public void RunJobs( List<JobState> states )
{
     ThreadPool.SetMaxThreads( 15, 15 );

     foreach( var state in states )
     {
          Interlocked.Increment( count );
          ThreadPool.QueueUserWorkItem( Job, state );
     }

    manual.WaitOne();
}

private static void Job( object state )
{
    // run job
    Interlocked.Decrement( count );
    if( Interlocked.Read( count ) == 0 ) manual.Set();
}
于 2009-04-28T03:06:43.933 回答
0

Microsoft 的反应式框架在这方面非常出色:

Action[] jobs = new Action[100];

var subscription =
    jobs
        .ToObservable()
        .Select(job => Observable.Start(job))
        .Merge(15)
        .Subscribe(
            x => Console.WriteLine("Job Done."),
            () => Console.WriteLine("All Jobs Done."))

完毕。

只是 NuGet“System.Reactive”。

于 2017-06-22T07:41:43.473 回答