4

我在某处的数据库队列中有无限数量的任务。让程序在 n 个不同线程上同时处理 n 个任务的最佳方法是什么,在完成旧任务时启动新任务?当一个任务完成时,另一个任务应该异步开始。当前运行的计数应始终为 n。

我最初的想法是使用线程池,但考虑到要处理的任务将在各个线程中检索,这似乎没有必要。换句话说,每个线程将自己去获取下一个任务,而不是让主线程获取任务然后分发它们。

我看到了多种选择,但我不知道应该使用哪一种来获得最佳性能。

1)线程池 - 鉴于不一定有任何等待线程,我不确定这是必要的。

2) 信号量 - 与 1 相同。如果没有等待由主线程分配的任务,信号量有什么好处?

3) 永远相同的线程 - 用 n 个线程启动程序。当一个线程完成工作时,它会自己获得下一个任务。主线程只是监视以确保 n 个线程仍然处于活动状态。

4) 事件处理 - 与 3 相同,除了当线程完成任务时,它会在死亡前触发 ImFinished 事件。ImFinished 事件处理程序启动一个新线程。这看起来就像 3,但开销更大(因为不断创建新线程)

5) 别的?

4

4 回答 4

4

BlockingCollection使整个事情变得微不足道:

var queue = new BlockingCollection<Action>();

int numWorkers = 5;

for (int i = 0; i < numWorkers; i++)
{
    Thread t = new Thread(() =>
    {
        foreach (var action in queue.GetConsumingEnumerable())
        {
            action();
        }
    });
    t.Start();
}

然后,您可以让主线程在启动工作程序之后(或之前,如果需要)将项目添加到阻塞集合中。您甚至可以生成多个生产者线程以将项目添加到队列中。

请注意,更传统的方法是使用Tasks而不是直接使用Thread类。我首先不建议这样做的主要原因是您特别要求运行的线程的确切数量(而不是最大数量),并且您对Task对象的运行方式没有太多控制(这很好;它们可以代表您进行优化)。如果该控制不像您所说的那么重要,那么以下可能最终会更可取:

var queue = new BlockingCollection<Action>();

int numWorkers = 5;

for (int i = 0; i < numWorkers; i++)
{
    Task.Factory.StartNew(() =>
    {
        foreach (var action in queue.GetConsumingEnumerable())
        {
            action();
        }
    }, CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
于 2012-11-26T16:33:43.133 回答
0

这可以通过TPL 数据流库轻松实现。

首先,假设您有一个BufferBlock<T>,这是您的队列:

var queue = new BufferBlock<T>();

然后,您需要对块执行的操作,这由ActionBlock<T>表示:

var action = new ActionBlock<T>(t => { /* Process t here */ },
    new ExecutionDataflowBlockOptions {
        // Number of concurrent tasks.
        MaxDegreeOfParallelism = ..., 
    });

请注意上面的构造函数,它需要一个实例ExecutionDataflowBlockOptions并将MaxDegreeOfParallelism属性设置为您希望同时处理的多个并发项目。

在表面之下,Task Parallel Library 用于处理为任务分配线程等。TPL Dataflow 旨在成为更高级别的抽象,它允许您调整所需并行度/节流/等。

例如,如果您不希望缓冲任何项目ActionBlock<TInput>(希望它们存在于 中)BufferBlock<T>,您还可以设置BoundedCapacity属性ActionBlock<TInput>正在处理的项目,以及保留的项目):

var action = new ActionBlock<T>(t => { /* Process t here */ },
    new ExecutionDataflowBlockOptions {
        // Number of concurrent tasks.
        MaxDegreeOfParallelism = ..., 

        // Set to MaxDegreeOfParallelism to not buffer.
        BoundedCapacity ..., 
    });

此外,如果您想要一个新的、新鲜的Task<TResult>实例来处理每个项目,那么您可以将MaxMessagesPerTask属性设置为一个,表示每个都Task<TResult>将处理一个项目:

var action = new ActionBlock<T>(t => { /* Process t here */ },
    new ExecutionDataflowBlockOptions {
        // Number of concurrent tasks.
        MaxDegreeOfParallelism = ..., 

        // Set to MaxDegreeOfParallelism to not buffer.
        BoundedCapacity ..., 

        // Process once item per task.
        MaxMessagesPerTask = 1,
    });

请注意,根据您的应用程序正在运行的其他任务数量,这可能对您来说是最佳的,也可能不是最佳的,您可能还需要考虑为通过ActionBlock<TInput>.

从那里开始,通过调用方法BufferBlock<T>将 链接到是一件简单的事情:ActionBlock<TInput>LinkTo

IDisposable connection = queue.LinkTo(action, new DataflowLinkOptions {
    PropagateCompletion = true;
});

您在此处将该PropogateCompletion属性设置为 true ,以便在等待 时ActionBlock<T>,将完成发送到ActionBlock<T>您可能随后等待的(如果/当没有更多项目要处理时)。

请注意,如果您希望删除块之间的链接,您可以在调用返回的接口实现上调用该Dispose方法。IDisposableLinkTo

Post最后,您使用以下方法将项目发布到缓冲区:

queue.Post(new T());

当你完成(如果你曾经完成),你调用Complete方法

queue.Complete();

然后,在操作块上,您可以通过等待属性Task公开的实例等到它完成:Completion

action.Completion.Wait();

希望它的优雅是显而易见的:

  • 您不必管理新Task实例/线程/等的创建来管理工作,这些块会根据您提供的设置为您完成(这是基于每个块的)。
  • 更清晰的关注点分离。缓冲区与动作分离,所有其他块也是如此。您构建块,然后将它们链接在一起。
于 2012-11-26T16:55:47.533 回答
0

我喜欢模型#3,并且以前使用过它;它减少了启动和停止的线程数量,并使主线程成为真正的“主管”,减少了它必须做的工作。

正如 Servy 所指出的, System.Collections.Concurrent 命名空间有一些在这里非常有价值的结构。ConcurrentQueue 是一种线程安全的 FIFO 集合实现,旨在用于此类模型;一个或多个“生产者”线程将元素添加到队列的“输入”端,而一个或多个“消费者”从另一端取出元素。如果队列中没有任何内容,则获取项目的调用仅返回 false;您可以通过退出任务方法来对此做出反应(然后主管可以决定是否启动另一个任务,可能通过监视队列的输入并在更多项目进入时加速)。

如果队列没有任何内容,BlockingCollection 添加了导致线程在尝试从队列中获取值时等待的行为。它还可以配置为具有最大容量,超过该容量将阻止“生产者”线程添加更多元素,直到有可用容量为止。BlockingCollection 默认使用 ConcurrentQueue,但您可以根据需要将其设置为 Stack、Dictionary 或 Bag。使用此模型,您可以让任务无限期地运行;当无事可做时,它们会简单地阻塞,直到至少有一个可以处理,因此主管必须检查的只是任务出错(任何强大的线程工作流模式的关键元素)。

于 2012-11-26T16:49:18.357 回答
-1

我是一个VB的人,但你可以很容易地翻译:

Private Async Sub foo()

    Dim n As Integer = 16
    Dim l As New List(Of Task)
    Dim jobs As New Queue(Of Integer)(Enumerable.Range(1, 100))

    For i = 1 To n
        Dim j = jobs.Dequeue
        l.Add(Task.Run((Sub()
                            Threading.Thread.Sleep(500)
                            Console.WriteLine(j)
                        End Sub)))
    Next

    While l.Count > 0
        Dim t = Await Task.WhenAny(l)
        If jobs.Count > 0 Then
            Dim j = jobs.Dequeue
            l(l.IndexOf(t)) = (Task.Run((Sub()
                                             Threading.Thread.Sleep(500)
                                             Console.WriteLine(j)
                                         End Sub)))
        Else
            l.Remove(t)
        End If
    End While

End Sub

有一篇来自 Stephen Toub 的文章,为什么你不应该以这种方式使用 Task.WhenAny ......有一个大的任务列表,但是对于“一些”任务,你通常不会遇到问题

这个想法很简单:您有一个列表,您可以在其中添加尽可能多的(正在运行的)任务,以并行运行。然后你(a)等待第一个完成。如果队列中仍有作业,则将作业分配给新任务,然后 (a) 再次等待。如果队列中没有作业,您只需删除已完成的任务。如果你的任务列表和队列都是空的,你就完成了。

Stephen Toub 文章:http: //blogs.msdn.com/b/pfxteam/archive/2012/08/02/processing-tasks-as-they-complete.aspx

于 2012-11-26T16:47:12.767 回答