3

我有以下代码,我从源中填充用户,例如,如下所示。我想要做的是与多个消费者一起使用 BlockingCollection。

下面是正确的方法吗?最好的线程数是多少?好的,这将取决于硬件、内存等。或者我怎样才能以更好的方式做到这一点?

下面的实现是否会确保我将处理集合中的所有内容,直到它为空?

    class Program
    {
        public static readonly BlockingCollection<User> users = new BlockingCollection<User>();

        static void Main(string[] args)
        {
            for (int i = 0; i < 100000; i++)
            {
                var u = new User {Id = i, Name = "user " + i};
                users.Add(u);
            }

            Run(); 
        }

        static void Run()
        {
            for (int i = 0; i < 100; i++)
            {
                Task.Factory.StartNew(Process, TaskCreationOptions.LongRunning);
            }
        }

        static void Process()
        {
            foreach (var user in users.GetConsumingEnumerable())
            {
                Console.WriteLine(user.Id);
            }
        }
    }

    public class User
    {
        public int Id { get; set; }
        public string Name { get; set; }
    }
4

1 回答 1

7

一些小事

  1. 您从未调用过CompleteAdding,因为不这样做,您的消费 foreach 循环将永远不会完成并永远挂起。通过users.CompleteAdding()在初始for循环之后进行修复。
  2. 您永远不会等待工作完成,Run()会启动您的 100 个线程(这可能太多了,除非您的实际过程涉及大量等待无竞争的资源)。因为任务不是前台线程,所以它们不会在您Main退出时让您的程序保持打开状态。您需要一个CountdownEvent来跟踪一切何时完成。
  3. 在您的生产者完成所有工作之前,您不会启动您的消费者,您应该将生产者分拆到一个单独的线程中或首先启动消费者,以便他们准备好工作,同时您在主线程上填充生产者.

这是带有修复程序的代码的更新版本

class Program
{
    private const int MaxThreads = 100; //way to high for this example.
    private static readonly CountdownEvent cde = new CountdownEvent(MaxThreads);
    public static readonly BlockingCollection<User> users = new BlockingCollection<User>();

    static void Main(string[] args)
    {
        Run(); 

        for (int i = 0; i < 100000; i++)
        {
            var u = new User {Id = i, Name = "user " + i};
            users.Add(u);
        }
        users.CompleteAdding();
        cde.Wait();
    }

    static void Run()
    {
        for (int i = 0; i < MaxThreads; i++)
        {
            Task.Factory.StartNew(Process, TaskCreationOptions.LongRunning);
        }
    }

    static void Process()
    {
        foreach (var user in users.GetConsumingEnumerable())
        {
            Console.WriteLine(user.Id);
        }
        cde.Signal();
    }
}

public class User
{
    public int Id { get; set; }
    public string Name { get; set; }
}

对于我之前所说的“最佳线程数”,这实际上取决于您在等待什么。

如果您正在处理的内容受 CPU 限制,则最佳线程数可能是Enviorment.ProcessorCount

如果您正在等待外部资源,但新请求不会影响旧请求(例如向 20 个不同的服务器询问信息,服务器上的负载n不会影响服务器上的负载n+1)在这种情况下,我会让Parallel .ForEach只需为您选择线程数。

如果您正在等待竞争的资源(例如读取/写入硬盘),您将根本不希望使用很多线程(甚至可能只使用一个)。我刚刚在另一个问题中发布了一个答案,当从硬盘读取时,你应该一次只使用一个线程,这样硬盘就不会到处跳来试图一次完成所有读取。

于 2013-06-21T20:51:04.110 回答