0

我一直在试图弄清楚如何解决我的需求,但是对于我的生活,我就是想不出一个解决方案。

我有一个项目数据库,将它们存储为一种队列。(数据库已经实现,其他进程将向此队列添加项目。)

这些项目需要大量的工作/时间来“处理”,所以我需要能够: 不断地从数据库中取出项目。对于每个项目,运行一个新线程并处理该项目,然后返回真/假,它已成功处理。(这将用于将其重新添加到数据库队列中)

但是仅在当前活动线程数(每个正在处理的项目一个)小于最大线程数参数时才执行此操作。

一旦达到最大线程数,我需要停止从数据库中取出项目,直到当前线程数小于最大线程数。此时它需要继续使项目出队。

感觉这应该是我能想出的东西,但它只是没有来找我。

澄清一下:我只需要实现线程。数据库已经实现。

4

3 回答 3

6

一种非常简单的方法是使用Semaphore. 您有一个线程可以使项目出列并创建线程来处理它们。例如:

const int MaxThreads = 4;
Semaphore sem = new Semaphore(MaxThreads, MaxThreads);
while (Queue.HasItems())
{
    sem.WaitOne();
    var item = Queue.Dequeue();
    Threadpool.QueueUserWorkItem(ProcessItem, item); // see below
}
// When the queue is empty, you have to wait for all processing
// threads to complete.
// If you can acquire the semaphore MaxThreads times, all workers are done
int count = 0;
while (count < MaxThreads)
{
    sem.WaitOne();
    ++count;
}

// the code to process an item
void ProcessItem(object item)
{
    // cast the item to whatever type you need,
    // and process it.
    // when done processing, release the semaphore
    sem.Release();
}

上述技术效果很好。它易于编码,易于理解且非常有效。

一项更改是您可能更愿意使用TaskAPI Threadpool.QueueUserWorkItemTask使您可以更好地控制异步处理,包括取消。我QueueUserWorkItem在我的例子中使用了,因为我更熟悉它。我会Task在生产程序中使用。

尽管这确实使用了 N+1 个线程(其中 N 是您要同时处理的项目数),但那个额外的线程通常不会做任何事情。它运行的唯一时间是在将工作分配给工作线程时。否则,它会在信号量上进行非忙碌等待。

于 2013-03-17T12:28:34.003 回答
0

你只是不知道从哪里开始?

考虑一个具有最大线程数的线程池。http://msdn.microsoft.com/en-us/library/y5htx827.aspx

考虑立即增加最大线程数并监控数据库。http://msdn.microsoft.com/en-us/library/system.threading.threadpool.queueuserworkitem.aspx很方便。

请记住,您不能保证您的流程会安全结束……崩溃会发生。考虑记录处理状态。

请记住,您的 select 和 remove-from-queue 操作应该是原子的。

于 2013-03-17T02:22:11.423 回答
0

好的,所以解决方案的架构将取决于一件事:每个队列项目的处理时间是否根据项目的数据而变化?

如果没有,那么您可以在处理线程之间进行循环。这将相当容易实现。

如果处理时间确实有所不同,那么您将需要具有更多“下一个可用”感觉的东西,这样无论哪个线程恰好是空闲的,都会首先获得处理数据项的工作。

解决了这个问题后,您将通常围绕如何在队列读取器和处理线程之间进行同步。“下一个可用”和“循环”之间的区别在于您如何进行同步。

我对 C# 并不太熟悉,但我听说过一种叫做后台工作者的野兽。这可能是实现这一目标的一种可接受的方式。

对于循环,只需为每个队列项启动一个后台工作程序,将工作人员的引用存储在一个数组中。例如,将自己限制在 16 个正在进行的后台工作人员。这个想法是,在开始 16 之后,您将在开始 17 之前等待第一个完成,依此类推。我相信后台工作人员实际上作为线程池上的作业运行,因此这将自动将任何时候实际运行的线程数限制为适合底层硬件的数量。要等待后台工作人员,请参阅this。等待后台工作人员完成后,您将处理其结果并启动另一个。

对于下一个可用的方法,它并没有太大的不同。而不是等待第一个完成,您将使用 WaitAny() 等待任何工作人员完成。您处理任何一个完成的返回,然后启动另一个并返回到 WaitAny()。

这两种方法的一般理念是始终保持多个线程处于沸腾状态。下一个可用方法的一个特点是您发出结果的顺序不一定与输入项的顺序相同。如果这很重要,那么使用比 CPU 内核更多的后台工作程序的循环方法将相当有效(线程池将刚刚开始调试但还没有运行工作程序)。然而,延迟会随着处理时间而变化。

BTW 16 是根据您认为运行该软件的 PC 上的内核数量选择的任意数字。更多的核心,更大的数量。

当然,在看似不安和不断变化的 .NET 世界中,现在可能有更好的方法来做到这一点。

祝你好运!

于 2013-03-17T11:28:52.587 回答