3

尝试使用异步 CTP 编写 HTML 爬虫时,我一直不知道如何编写无递归方法来完成此任务。

这是我到目前为止的代码。

private readonly ConcurrentStack<LinkItem> _LinkStack;
private readonly Int32 _MaxStackSize;
private readonly WebClient client = new WebClient();

Func<string, string, Task<List<LinkItem>>> DownloadFromLink = async (BaseURL, uri) => 
{
    string html = await client.DownloadStringTaskAsync(uri);
    return LinkFinder.Find(html, BaseURL);
};

Action<LinkItem> DownloadAndPush = async (o) => 
{
    List<LinkItem> result = await DownloadFromLink(o.BaseURL, o.Href);
    if (this._LinkStack.Count() + result.Count <= this._MaxStackSize)
    {
        this._LinkStack.PushRange(result.ToArray());
        o.Processed = true;
    }  
};

Parallel.ForEach(this._LinkStack, (o) => 
{
    DownloadAndPush(o);
});

但显然这并不像我希望的那样起作用,因为在Parallel.ForEach执行第一次(也是唯一一次迭代)时,我只有一项。我能想到的最简单的ForEach递归方法,但我不能(我不认为)这样做,因为我会很快用完堆栈空间。

任何人都可以指导我如何重组此代码,以创建我将描述为添加项目的递归延续,直到MaxStackSize达到或系统内存不足?

4

1 回答 1

10

我认为使用 C# 5/.Net 4.5 执行此类操作的最佳方法是使用TPL Dataflow。甚至还有一个关于如何使用它来实现网络爬虫的演练

基本上,您创建一个“块”来负责下载一个 URL 并从中获取链接:

var cts = new CancellationTokenSource();

Func<LinkItem, Task<IEnumerable<LinkItem>>> downloadFromLink =
    async link =>
            {
                // WebClient is not guaranteed to be thread-safe,
                // so we shouldn't use one shared instance
                var client = new WebClient();
                string html = await client.DownloadStringTaskAsync(link.Href);

                return LinkFinder.Find(html, link.BaseURL);
            };

var linkFinderBlock = new TransformManyBlock<LinkItem, LinkItem>(
    downloadFromLink,
    new ExecutionDataflowBlockOptions
    { MaxDegreeOfParallelism = 4, CancellationToken = cts.Token });

您可以设置MaxDegreeOfParallelism为您想要的任何值。它说最多可以同时下载多少个 URL。如果您根本不想限制它,可以将其设置为DataflowBlockOptions.Unbounded.

然后你创建一个块以某种方式处理所有下载的链接,比如将它们全部存储在一个列表中。它还可以决定何时取消下载:

var links = new List<LinkItem>();

var storeBlock = new ActionBlock<LinkItem>(
    linkItem =>
    {
        links.Add(linkItem);
        if (links.Count == maxSize)
            cts.Cancel();
    });

由于我们没有设置MaxDegreeOfParallelism,它默认为 1。这意味着在这里使用不是线程安全的集合应该没问题。

我们再创建一个块:它将从 获取一个链接linkFinderBlock,并将其传递给storeBlock和传回给linkFinderBlock

var broadcastBlock = new BroadcastBlock<LinkItem>(li => li);

其构造函数中的 lambda 是一个“克隆函数”。如果您愿意,您可以使用它来创建项目的克隆,但这里不需要它,因为我们不会修改LinkItem创建后的内容。

现在我们可以将块连接在一起:

linkFinderBlock.LinkTo(broadcastBlock);
broadcastBlock.LinkTo(storeBlock);
broadcastBlock.LinkTo(linkFinderBlock);

然后我们可以通过将第一个项目提供给linkFinderBlock(或者broadcastBlock,如果您还想将其发送给storeBlock)来开始处理:

linkFinderBlock.Post(firstItem);

最后等到处理完成:

try
{
    linkFinderBlock.Completion.Wait();
}
catch (AggregateException ex)
{
    if (!(ex.InnerException is TaskCanceledException))
        throw;
}
于 2012-02-13T14:35:19.887 回答