28

我正在编写一个从网页中抓取数据的 C# 控制台应用程序。

该应用程序将访问大约 8000 个网页并抓取数据(每个页面上的数据格式相同)。

我现在可以在没有异步方法和多线程的情况下使用它。

但是,我需要它更快。它只使用大约 3%-6% 的 CPU,我认为是因为它花费了等待下载 html 的时间。(WebClient.DownloadString(url))

这是我的程序的基本流程

DataSet alldata;

foreach(var url in the8000urls)
{
    // ScrapeData downloads the html from the url with WebClient.DownloadString
    // and scrapes the data into several datatables which it returns as a dataset.
    DataSet dataForOnePage = ScrapeData(url);

    //merge each table in dataForOnePage into allData
}

// PushAllDataToSql(alldata);

我一直在尝试多线程,但不知道如何正确开始。我正在使用.net 4.5,我的理解是异步的,并且在 4.5 中等待是为了使编程更容易,但我还是有点迷茫。

我的想法是继续为这条线创建异步的新线程

DataSet dataForOnePage = ScrapeData(url);

然后当每个完成时,运行

//merge each table in dataForOnePage into allData

谁能指出我如何在.net 4.5 c#中使该行异步然后让我的合并方法运行完成的正确方向?

谢谢你。

编辑:这是我的 ScrapeData 方法:

public static DataSet GetProperyData(CookieAwareWebClient webClient, string pageid)
{
    var dsPageData = new DataSet();

    // DOWNLOAD HTML FOR THE REO PAGE AND LOAD IT INTO AN HTMLDOCUMENT
    string url = @"https://domain.com?&id=" + pageid + @"restofurl";
    string html = webClient.DownloadString(url);
    var doc = new HtmlDocument();
    doc.LoadHtml(html );

    // A BUNCH OF PARSING WITH HTMLAGILITY AND STORING IN dsPageData 
    return dsPageData ;
}
4

4 回答 4

42

如果您想使用asyncandawait关键字(虽然您不必这样做,但它们确实使 .NET 4.5 中的事情变得更容易),您首先需要更改您的方法以使用关键字ScrapeData返回一个Task<T>实例,如下所示:async

async Task<DataSet> ScrapeDataAsync(Uri url)
{
    // Create the HttpClientHandler which will handle cookies.
    var handler = new HttpClientHandler();

    // Set cookies on handler.

    // Await on an async call to fetch here, convert to a data
    // set and return.
    var client = new HttpClient(handler);

    // Wait for the HttpResponseMessage.
    HttpResponseMessage response = await client.GetAsync(url);

    // Get the content, await on the string content.
    string content = await response.Content.ReadAsStringAsync();

    // Process content variable here into a data set and return.
    DataSet ds = ...;

    // Return the DataSet, it will return Task<DataSet>.
    return ds;
}

请注意,您可能希望远离WebClient该类,因为它本身不支持Task<T>其异步操作。.NET 4.5 中更好的选择是HttpClient。我选择使用HttpClient上面。此外,请查看HttpClientHandlerclass,特别是您将用于在每个请求中发送 cookie的CookieContainer属性。

但是,这意味着您很可能必须使用await关键字来等待另一个异步操作,在这种情况下,很可能是页面的下载。您必须定制下载数据的调用以使用异步版本和await那些。

一旦完成,你通常会调用await它,但在这种情况下你不能这样做,因为你会调用await一个变量。在这种情况下,您正在运行一个循环,因此每次迭代都会重置该变量。在这种情况下,最好将 存储Task<T>在一个数组中,如下所示:

DataSet alldata = ...;

var tasks = new List<Task<DataSet>>();

foreach(var url in the8000urls)
{
    // ScrapeData downloads the html from the url with 
    // WebClient.DownloadString
    // and scrapes the data into several datatables which 
    // it returns as a dataset.
    tasks.Add(ScrapeDataAsync(url));
}

存在将数据合并到allData. 为此,您希望在返回的实例上调用该ContinueWith方法Task<T>并执行将数据添加到的任务allData

DataSet alldata = ...;

var tasks = new List<Task<DataSet>>();

foreach(var url in the8000urls)
{
    // ScrapeData downloads the html from the url with 
    // WebClient.DownloadString
    // and scrapes the data into several datatables which 
    // it returns as a dataset.
    tasks.Add(ScrapeDataAsync(url).ContinueWith(t => {
        // Lock access to the data set, since this is
        // async now.
        lock (allData)
        {
             // Add the data.
        }
    });
}

然后,您可以使用上的WhenAll方法等待所有任务:Taskawait

// After your loop.
await Task.WhenAll(tasks);

// Process allData

但是,请注意,您有一个foreach, 并且WhenAll需要一个IEnumerable<T>实现。这是一个很好的指标,表明它适合使用 LINQ,它是:

DataSet alldata;

var tasks = 
    from url in the8000Urls
    select ScrapeDataAsync(url).ContinueWith(t => {
        // Lock access to the data set, since this is
        // async now.
        lock (allData)
        {
             // Add the data.
        }
    });

await Task.WhenAll(tasks);

// Process allData

如果你愿意,你也可以选择不使用查询语法,在这种情况下没关系。

请注意,如果包含方法未标记为async(因为您在控制台应用程序中并且必须在应用程序终止之前等待结果),那么您可以在调用时简单地调用返回的Wait方法TaskWhenAll

// This will block, waiting for all tasks to complete, all
// tasks will run asynchronously and when all are done, then the
// code will continue to execute.
Task.WhenAll(tasks).Wait();

// Process allData.

也就是说,关键是,您希望将Task实例收集到一个序列中,然后在处理之前等待整个序列allData

allData但是,如果可以的话,我建议在合并之前尝试处理数据;除非数据处理需要整个 .DataSet _

于 2012-07-24T21:16:59.930 回答
11

您也可以使用TPL Dataflow,它非常适合此类问题。

在这种情况下,您构建一个“数据流网格”,然后您的数据流过它。

这个实际上更像是一个管道而不是一个“网格”。我分三个步骤: 从 URL 下载(字符串)数据;将(字符串)数据解析为 HTML,然后解析为DataSet; 并合并DataSet到 masterDataSet中。

首先,我们创建将进入网格的块:

DataSet allData;
var downloadData = new TransformBlock<string, string>(
  async pageid =>
  {
    System.Net.WebClient webClient = null;
    var url = "https://domain.com?&id=" + pageid + "restofurl";
    return await webClient.DownloadStringTaskAsync(url);
  },
  new ExecutionDataflowBlockOptions
  {
    MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
  });
var parseHtml = new TransformBlock<string, DataSet>(
  html =>
  {
    var dsPageData = new DataSet();
    var doc = new HtmlDocument();
    doc.LoadHtml(html);

    // HTML Agility parsing

    return dsPageData;
  },
  new ExecutionDataflowBlockOptions
  {
    MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
  });
var merge = new ActionBlock<DataSet>(
  dataForOnePage =>
  {
    // merge dataForOnePage into allData
  });

然后我们将三个块链接在一起以创建网格:

downloadData.LinkTo(parseHtml);
parseHtml.LinkTo(merge);

接下来,我们开始将数据泵入网格:

foreach (var pageid in the8000urls)
  downloadData.Post(pageid);

最后,我们等待网格中的每个步骤完成(这也将干净地传播任何错误):

downloadData.Complete();
await downloadData.Completion;
parseHtml.Complete();
await parseHtml.Completion;
merge.Complete();
await merge.Completion;

TPL Dataflow 的好处是您可以轻松控制每个部分的并行程度。目前,我已将下载和解析块都设置为Unbounded,但您可能想要限制它们。合并块使用默认的最大并行度 1,因此合并时不需要锁。

于 2012-07-25T21:23:58.313 回答
1

我建议阅读我对/的相当完整的介绍asyncawait

首先,让一切都异步,从较低级别的东西开始:

public static async Task<DataSet> ScrapeDataAsync(string pageid)
{
  CookieAwareWebClient webClient = ...;
  var dsPageData = new DataSet();

  // DOWNLOAD HTML FOR THE REO PAGE AND LOAD IT INTO AN HTMLDOCUMENT
  string url = @"https://domain.com?&id=" + pageid + @"restofurl";
  string html = await webClient.DownloadStringTaskAsync(url).ConfigureAwait(false);
  var doc = new HtmlDocument();
  doc.LoadHtml(html);

  // A BUNCH OF PARSING WITH HTMLAGILITY AND STORING IN dsPageData 
  return dsPageData;
}

然后您可以按如下方式使用它(async与 LINQ 一起使用):

DataSet alldata;
var tasks = the8000urls.Select(async url =>
{
  var dataForOnePage = await ScrapeDataAsync(url);

  //merge each table in dataForOnePage into allData

});
await Task.WhenAll(tasks);
PushAllDataToSql(alldata);

AsyncContext从我的 AsyncEx 库中使用,因为这是一个控制台应用程序

class Program
{
  static int Main(string[] args)
  {
    try
    {
      return AsyncContext.Run(() => MainAsync(args));
    }
    catch (Exception ex)
    {
      Console.Error.WriteLine(ex);
      return -1;
    }
  }

  static async Task<int> MainAsync(string[] args)
  {
    ...
  }
}

就是这样。不需要锁定或延续或任何其他。

于 2012-07-25T20:59:52.087 回答
-1

我相信你不需要这里asyncawait东西。它们可以在需要将工作转移到非 GUI 线程的桌面应用程序中提供帮助。Parallel.ForEach在我看来,在你的情况下使用方法会更好。像这样的东西:

    DataSet alldata;
    var bag = new ConcurrentBag<DataSet>();

    Parallel.ForEach(the8000urls, url =>
    {
        // ScrapeData downloads the html from the url with WebClient.DownloadString 
        // and scrapes the data into several datatables which it returns as a dataset. 
        DataSet dataForOnePage = ScrapeData(url);
        // Add data for one page to temp bag
        bag.Add(dataForOnePage);
    });

    //merge each table in dataForOnePage into allData from bag

    PushAllDataToSql(alldata); 
于 2012-07-25T08:51:14.420 回答