3

我正在为网站开发多线程抓取工具,根据另一个问题,我决定将 ThreadPool 与 QueueUserWorkItem() 一起使用。

我怎样才能不断地排队工作项而不是一次将它们全部排队?我需要排队 > 300k 个项目(每个用户 ID 一个),如果我循环将它们全部排队,我将耗尽内存。

所以,我想要的是:

// 1 = startUserID, 300000 = endUserID, 25 = MaxThreads  
Scraper webScraper = new Scraper(1, 300000, 25); 

webScraper.Start();  
// return immediately while webScraper runs in the background

在此期间,webScraper 在线程可用时不断添加所有 300000 个工作项。

这是我到目前为止所拥有的:

public class Scraper
    {
        private int MaxUserID { get; set; }
        private int MaxThreads { get; set; }
        private static int CurrentUserID { get; set; }
        private bool Running { get; set; }
        private Parser StatsParser = new Parser();


        public Scraper()
            : this(0, Int32.MaxValue, 25)
        {
        }

        public Scraper(int CurrentUserID, int MaxUserID, int MaxThreads)
        {
            this.CurrentUserID = CurrentUserID;
            this.MaxUserID = MaxUserID;
            this.MaxThreads = MaxThreads;
            this.Running = false;

            ThreadPool.SetMaxThreads(MaxThreads, MaxThreads);
        }

        public void Start()
        {
            int availableThreads;

            // Need to start a new thread to spawn the new WorkItems so Start() will return right away?
            while (Running)
            {

                // if (!CurrentUserID >= MaxUserID)
                // {
                //     while (availableThreads > 0)
                //     {
                //         ThreadPool.QueueUserWorkItem(new WaitCallBack(Process));
                //     }
                // }
                // else
                // { Running = false; }
            }
        }

        public void Stop()
        {
            Running = false;
        }

        public static void process(object state)
        {
             var userID = Interlocked.Increment(ref CurrentUserID);
             ... Fetch Stats for userID
        }
    }

这是正确的方法吗?

有人能指出我在调用 Start() 后在后台处理创建工作项的正确方向,而不是一次创建所有工作项吗?

4

5 回答 5

2

使用更少的从工作队列中窃取工作的工作项会更好地实现吗?仅仅因为你有 300,000 件工作要做,并不意味着你需要 300,000 个工人来做。显然,由于您只有几个内核,因此这些工作中只有少数可以并行发生,那么为什么不将大量工作分配给更少的工作人员呢?

根据每件工作所花费的时间的恒定程度,您可以将所有工作平均分配给每个工作人员,也可以有一个中央队列(您必须锁定),每个工作人员可以在工作用完时获取一些工作.

编辑:

Joe Duffy 似乎有一个关于在这里编写工作窃取队列的系列:http: //www.bluebytesoftware.com/blog/2008/08/12/BuildingACustomThreadPoolSeriesPart2AWorkStealingQueue.aspx。看起来 .Net 4 的 Threadpool 会变得更智能一些。但我认为对于这种情况,您不需要特别复杂的东西。

于 2009-09-10T09:15:20.010 回答
0

我认为以某种方式创建排队项目的队列似乎不太正确,那么如何让 WorkItems 在完成后再次排队呢?

您的 Start 方法可以排队,例如 3 次 MaxThreads 项目(在您的示例中为 75),然后您的 Process 方法在完成时自行排队。这样,您的 Start 方法会快速返回,但会触发许多工作项,正如我所说的那样,它们会自行触发:


    public class Scraper
    {
        private int MaxUserID { get; set; }
        private int MaxThreads { get; set; }
        private int currentUserID;
        private bool Running { get; set; }
        private Parser StatsParser = new Parser();

        private int Multiplier { get; set; }

        public Scraper()
            : this(0, Int32.MaxValue, 25)
        {
        }

        public Scraper(int currentUserID, int maxUserID, int maxThreads)
        {
            this.currentUserID = currentUserID;
            this.MaxUserID = maxUserID;
            this.MaxThreads = maxThreads;
            this.Running = false;

            ThreadPool.SetMaxThreads(maxThreads, maxThreads);
            Multiplier = 3;
        }

        public void Start()
        {
            Running = true;
            for (int i = 0; i < MaxThreads * Multiplier; i++)
            {
                ThreadPool.QueueUserWorkItem(Process);
            }
        }

        public void Stop()
        {
            Running = false;
        }

        public void Process(object state)
        {
            if (Running == false)
            {
                return;
            }
            if (currentUserID < MaxUserID)
            {
                Interlocked.Increment(ref currentUserID);
                //Parse stats for currentUserID
                ThreadPool.QueueUserWorkItem(Process);
            }
            else
            { Running = false; }
        }
    }

我确定应该使用互锁设置运行标志以确保安全。我已经将乘数变成了一个属性,它可以传递给构造函数——我很确定它可以被调整以调整性能,这取决于这些统计数据需要多长时间来解析。

于 2009-10-26T17:02:02.223 回答
0

看起来您需要一个主进程控制类来控制正在解雇的工作人员的数量并保持队列满。

您可以使用两个队列:

  1. 一个可以容纳您需要刮擦的所有物品
  2. 二是做好工作

然后,此 Master/Governor 对象将保持一个循环,直到队列 #1 中的所有项目都消失了,并且当您有可用周期时,它将继续添加到队列 #2。

于 2009-10-27T20:15:23.233 回答
0

我绝对不会使用 ThreadPool.SetMaxThreads -请记住线程池在所有进程之间共享- 设置最大线程数只会降低性能。线程池背后的整个想法是,您不需要指定诸如最大线程数之类的东西——.Net 框架会计算出要分配的最佳线程数——你不需要这样做。

请注意,排队 300 000 个项目不会导致产生 300 000 个线程 - ThreadPool 类将为您管理线程数并在必要时重新使用线程。如果您只是担心以这种方式会消耗太多资源,我建议您改进您的流程 - 也许创建一个“Spawner”类,然后运行 ​​1000 个刮板实例?

于 2009-10-26T20:04:08.237 回答
0

您可以使用不同的线程池。这是一个:http://www.codeplex.com/smartthreadpool 它允许您一次将所有项目排队。您可以分配要创建的最大线程数。假设您有 1000 个工作项并分配了 100 个线程。它将立即获取前 100 个项目并让它们继续运行,而其余的则等待。一旦这些项目之一完成并且线程释放,下一个排队的项目就会启动。它管理所有工作,但不会使线程和内存饱和。此外,它不使用 .net 线程池中的线程。

于 2009-10-28T19:54:39.980 回答