4

我一直在考虑让我的网络抓取工具成为多线程的,不像普通线程(egThread scrape = new Thread(Function);),而是像线程池那样可以有大量线程的线程池。

我的刮板通过使用for循环来刮页面来工作。

for (int i = (int)pagesMin.Value; i <= (int)pagesMax.Value; i++)

那么我怎么能用线程池之类的东西对函数(包含循环)进行多线程处理呢?我以前从未使用过线程池,而且我看到的示例对我来说非常混乱或晦涩难懂。


我已将循环修改为:

int min = (int)pagesMin.Value;
int max = (int)pagesMax.Value;
ParallelOptions pOptions = new ParallelOptions();
pOptions.MaxDegreeOfParallelism = Properties.Settings.Default.Threads;
Parallel.For(min, max, pOptions, i =>{
    //Scraping
});

那会起作用还是我有什么问题?

4

5 回答 5

5

使用池线程的问题在于它们大部分时间都在等待来自 Web 站点的响应。使用的问题Parallel.ForEach是它限制了你的并行性。

通过使用异步 Web 请求,我获得了最佳性能。我用 aSemaphore来限制并发请求的数量,回调函数做了抓取。

主线程创建Semaphore,如下所示:

Semaphore _requestsSemaphore = new Semaphore(20, 20);

20是通过反复试验得出的。事实证明,限制因素是 DNS 解析,平均而言,它需要大约 50 毫秒。至少,在我的环境中确实如此。20 个并发请求是绝对最大值。15可能更合理。

主线程本质上是循环的,如下所示:

while (true)
{
    _requestsSemaphore.WaitOne();
    string urlToCrawl = DequeueUrl();  // however you do that
    var request = (HttpWebRequest)WebRequest.Create(urlToCrawl);
    // set request properties as appropriate
    // and then do an asynchronous request
    request.BeginGetResponse(ResponseCallback, request);
}

ResponseCallback方法将在池线程上调用,执行处理,处理响应,然后释放信号量以便可以发出另一个请求。

void ResponseCallback(IAsyncResult ir)
{
    try
    {
        var request = (HttpWebRequest)ir.AsyncState;
        // you'll want exception handling here
        using (var response = (HttpWebResponse)request.EndGetResponse(ir))
        {
            // process the response here.
        }
    }
    finally
    {
        // release the semaphore so that another request can be made
        _requestSemaphore.Release();
    }
}

正如我所说,限制因素是 DNS 解析。事实证明,DNS 解析是在调用线程(本例中为主线程)上完成的。请参阅这真的是异步的吗?了解更多信息。

这很容易实现并且效果很好。有可能获得超过 20 个并发请求,但根据我的经验,这样做需要相当多的努力。我不得不做很多 DNS 缓存,而且……嗯,这很困难。

您可能可以通过使用TaskC# 5.0 (.NET 4.5) 中的新异步内容来简化上述内容。不过,我对那些说的不太熟悉。

于 2013-04-20T01:12:49.383 回答
3

最好使用 TPL,即Parallel.ForEach使用带有Partitioner的重载。它自动管理工作量。

供参考。你应该明白更多的线程并不意味着更快。我建议您进行一些测试以比较未参数化Parallel.ForEach和用户定义。

更新

    public void ParallelScraper(int fromInclusive, int toExclusive,
                                Action<int> scrape, int desiredThreadsCount)
    {
        int chunkSize = (toExclusive - fromInclusive +
            desiredThreadsCount - 1) / desiredThreadsCount;
        ParallelOptions pOptions = new ParallelOptions
        {
            MaxDegreeOfParallelism = desiredThreadsCount
        };

        Parallel.ForEach(Partitioner.Create(fromInclusive, toExclusive, chunkSize),
            rng =>
            {
                for (int i = rng.Item1; i < rng.Item2; i++)
                    scrape(i);
            });
    }

注意您的情况可能会更好async

于 2013-04-20T00:49:51.147 回答
2

如果你认为你的网络爬虫喜欢使用 for 循环,你可以看看类似于 foreach 循环的Parallel.ForEach() ;但是,它会迭代可枚举的数据。Parallel.ForEach使用多个线程来调用循环体。

有关更多详细信息,请参阅并行循环

更新:

Parallel.For()与Parallel.ForEach ()非常相似,它取决于您使用 for 或 foreach 循环的上下文。

于 2013-04-20T00:53:32.157 回答
0

在对我们的“ Crawler-Lib 框架”进行测试期间,我发现并行、TPL 或线程尝试不会让您获得想要的吞吐量。您在本地计算机上每秒处理 300-500 个请求。如果要并行执行数千个请求,则必须以异步模式执行它们并并行处理结果。我们的 Crawler-Lib 引擎(支持工作流的请求处理器)在本地机器上以大约 10.000 - 20.000 个请求/秒的速度执行此操作。如果你想拥有一个快速的刮刀,不要尝试使用 TPL。而是使用异步模式(开始...结束...)并在一个线程中启动所有请求。

如果您的许多请求往往会在 30 秒后超时,那么情况会更糟。在这种情况下,基于 TPL 的解决方案将获得 5 的丑陋糟糕的吞吐量?1?每秒请求数。异步模式每秒至少为您提供 100-300 个请求。Crawler-Lib 引擎可以很好地处理这个问题并获得尽可能多的请求。假设您的 TCP/IP 策略配置为具有 60000 个出站连接(最大值为 65535,因为每个连接都需要一个出站端口),那么您将获得 60000 个连接/30 秒超时 = 2000 个请求/秒的吞吐量。

于 2013-08-30T19:57:32.250 回答
0

这是 TPL Dataflow 的ActionBlock的完美场景。您可以轻松地对其进行配置以限制并发。以下是文档中的示例之一:

var downloader = new ActionBlock<string>(async url =>
{
    byte [] imageData = await DownloadAsync(url);
    Process(imageData);
}, new DataflowBlockOptions { MaxDegreeOfParallelism = 5 }); 

downloader.Post("http://msdn.com/concurrency ");
downloader.Post("http://blogs.msdn.com/pfxteam");

您可以通过下载Introduction to TPL Dataflow 来了解 ActionBlock(包括引用的示例)。

于 2013-04-20T02:57:54.673 回答