0

我有一个生产者从资源中获取用户并将它们放入 ConcurrentQueue,然后我想要做的是使用多个消费者并处理所有用户并从另一个资源中获取他们的信息。

  public void Populate(IEnumerable<Users> users){
     _queue.Enqueue(users);
     // here single threaded
  }

  public void Process(){
     // here i want this to be processed by multiple consumers
     // say multiple threads so that I can finish processing them.
  }

我的问题是,我应该使用线程吗?任务?线程池?

我见过这个问题:Java ExecutorService.newSingleThreadExecutor() 的 C# 等效项,或者:如何序列化对资源的多线程访问

4

1 回答 1

5

由于您已经在使用排队机制,我建议您使用 aBlockingCollection而不是ConcurrentQueue,以及Parallel.Invoke().

有一些重要的事情BlockingCollection使它很好用。

  1. BlockingCollection 允许使用线程以线程安全和自然的方式从集合中获取项目,使用foreach.
  2. foreach当队列为空时,消费循环自动阻塞,并在项目可用时继续。
  3. BlockingCollection 提供了一种易于使用的机制来发出数据结束的信号。队列所有者只需调用queue.CompleteAdding(),任何foreach从队列中获取项目的循环都会在队列完全为空时自动退出。

您可以使用Parallel.Invoke()启动多个线程,每个线程都foreach用于迭代队列。(Parallel.Invoke()让你给它一个并行运行的任务数组,这使得它使用起来非常简单。)

最好用一个示例程序来说明这一点:

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    class User
    {
        public string Name;
    }

    class Program
    {
        readonly BlockingCollection<User> _queue = new BlockingCollection<User>();

        void run()
        {
            var background = Task.Factory.StartNew(process); // Start the processing threads.

            // Make up 50 sample users.
            var users = Enumerable.Range(0, 50).Select(n => new User{Name = n.ToString()});

            foreach (var user in users) // Add some sample data.
                _queue.Add(user);

            Console.WriteLine("Press <RETURN> to exit.");
            Console.ReadLine();
            _queue.CompleteAdding(); // Makes all the consuming foreach loops exit.
            background.Wait();
            Console.WriteLine("Exited.");
        }

        void process() // Process the input queue,
        {
            int taskCount = 4;  // Let's use 4 threads.
            var actions = Enumerable.Repeat<Action>(processQueue, taskCount);
            Parallel.Invoke(actions.ToArray());
        }

        void processQueue()
        {
            foreach (User user in _queue.GetConsumingEnumerable())
                processUser(user);
        }

        void processUser(User user)
        {
            Console.WriteLine("Processing user " + user.Name);
            Thread.Sleep(200); // Simulate work.
        }

        static void Main()
        {
            new Program().run();
        }
    }
}

如果您不需要限制并发线程的数量并且很乐意让 .Net 为您决定(这不是一个坏主意),那么您可以通过processQueue()完全删除并更改process()为:

void process() // Process the input queue,
{
    Parallel.ForEach(_queue.GetConsumingEnumerable(), processUser);
}

但是,这比它需要的锁定更多,所以你最好只使用原始方法(不会遇到这个问题),或者使用这里描述的解决方案:http: //blogs.msdn.com /b/pfxteam/archive/2010/04/06/9990420.aspx

于 2013-06-20T20:14:50.200 回答