由于您已经在使用排队机制,我建议您使用 aBlockingCollection
而不是ConcurrentQueue
,以及Parallel.Invoke()
.
有一些重要的事情BlockingCollection
使它很好用。
- BlockingCollection 允许使用线程以线程安全和自然的方式从集合中获取项目,使用
foreach
.
foreach
当队列为空时,消费循环自动阻塞,并在项目可用时继续。
- BlockingCollection 提供了一种易于使用的机制来发出数据结束的信号。队列所有者只需调用
queue.CompleteAdding()
,任何foreach
从队列中获取项目的循环都会在队列完全为空时自动退出。
您可以使用Parallel.Invoke()
启动多个线程,每个线程都foreach
用于迭代队列。(Parallel.Invoke()
让你给它一个并行运行的任务数组,这使得它使用起来非常简单。)
最好用一个示例程序来说明这一点:
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Demo
{
class User
{
public string Name;
}
class Program
{
readonly BlockingCollection<User> _queue = new BlockingCollection<User>();
void run()
{
var background = Task.Factory.StartNew(process); // Start the processing threads.
// Make up 50 sample users.
var users = Enumerable.Range(0, 50).Select(n => new User{Name = n.ToString()});
foreach (var user in users) // Add some sample data.
_queue.Add(user);
Console.WriteLine("Press <RETURN> to exit.");
Console.ReadLine();
_queue.CompleteAdding(); // Makes all the consuming foreach loops exit.
background.Wait();
Console.WriteLine("Exited.");
}
void process() // Process the input queue,
{
int taskCount = 4; // Let's use 4 threads.
var actions = Enumerable.Repeat<Action>(processQueue, taskCount);
Parallel.Invoke(actions.ToArray());
}
void processQueue()
{
foreach (User user in _queue.GetConsumingEnumerable())
processUser(user);
}
void processUser(User user)
{
Console.WriteLine("Processing user " + user.Name);
Thread.Sleep(200); // Simulate work.
}
static void Main()
{
new Program().run();
}
}
}
如果您不需要限制并发线程的数量并且很乐意让 .Net 为您决定(这不是一个坏主意),那么您可以通过processQueue()
完全删除并更改process()
为:
void process() // Process the input queue,
{
Parallel.ForEach(_queue.GetConsumingEnumerable(), processUser);
}
但是,这比它需要的锁定更多,所以你最好只使用原始方法(不会遇到这个问题),或者使用这里描述的解决方案:http: //blogs.msdn.com /b/pfxteam/archive/2010/04/06/9990420.aspx