1

我刚刚开始学习 C# 线程和并发集合,并且不确定提出问题的正确术语,所以我将简要描述我正在尝试做的事情。在这一点上,我对这个主题的理解充其量只是初步的。我的方法是否像我想象的那样可行?

  1. 我在 Concurrent 集合中有 100,000 个 url 必须进行测试——链接仍然有效吗?我有另一个并发集合,最初是空的,它将包含异步请求确定已移动的 url 子集(400、404 等错误)。

  2. 我想在我的电脑和我们的带宽允许的情况下同时产生尽可能多的这些异步请求,并且将从每秒 20 个异步网络请求任务开始,然后从那里开始。

如果单个异步任务同时处理这两件事,它会起作用吗:它会发出异步请求,然后如果遇到 4xx 错误,则将 url 添加到 BadUrls 集合中?该任务的一个新实例将每 50 毫秒产生一次:

     class TestArgs args {
        ConcurrentBag<UrlInfo> myCollection  { get; set; }
        System.Uri currentUrl  { get; set; }
     }

      ConcurrentQueue<UrlInfo> Urls = new ConncurrentQueue<UrlInfo>();
        // populate the Urls queue
        <snip>

     // initialize the bad urls collection  
      ConcurrentBag<UrlInfo> BadUrls = new ConcurrentBag<UrlInfo>();


      // timer fires every 50ms, whereupon a new args object is created
      //  and the timer callback spawns a new task; an autoEvent would
      // reset the timer and dispose of it when the queue was empty


       void SpawnNewUrlTask(){
           // if queue is empty then reset the timer
           // otherwise:
           TestArgs args = {            
               myCollection = BadUrls,              
                currentUrl = getNextUrl()  // take an item from the queue
           };
           Task.Factory.StartNew( asyncWebRequestAndConcurrentCollectionUpdater, args);
       }



       public async Task asyncWebRequestAndConcurrentCollectionUpdater(TestArgs args) 
       {
           //make the async web request 
           // add the url to the bad collection if appropriate.  
       } 

可行的?走开?

4

2 回答 2

3

该方法看起来不错,但是您显示的特定代码存在一些问题。

但在我开始之前,评论中有建议说任务并行是要走的路。我认为这是误导。有一个常见的误解是,如果您想要并行进行大量工作,则必然需要大量线程。仅当工作受计算限制时,这才是正确的。但是您所做的工作将受 IO 限制 - 此代码将花费大部分时间等待响应。它会做很少的计算。所以在实践中,即使它只使用一个线程,你最初的每秒 20 个请求的目标似乎也不像是一个会导致单个 CPU 内核出汗的工作负载。

简而言之,单个线程可以处理非常高水平的并发 IO。如果您需要并行执行代码,您只需要多个线程,而这里看起来不太可能是这种情况,因为在这个特定的工作中 CPU 的工作很少。

(这种误解早于多年。事实上,它早于 TPL - 请参阅awaithttp://www.interact-sw.co.uk/iangblog/2004/09/23/threadless了解.NET 1.1 时代的说明可以用少量线程处理数千个并发请求。基本原理今天仍然适用,因为 Windows 网络 IO 基本上仍然以相同的方式工作。)async

并不是说在这里使用多线程有什么特别的错误,我只是指出这有点分散注意力。

无论如何,回到你的代码。这条线是有问题的:

Task.Factory.StartNew( asyncWebRequestAndConcurrentCollectionUpdater, args);

虽然你没有给我们所有的代码,但我看不出它是如何编译的。接受两个参数的重载StartNew要求第一个是 an Action、 an Action<object>、 aFunc<TResult>或 a Func<object,TResult>。换句话说,它必须是一个方法,要么不接受参数,要么接受一个类型的参数object(并且可能返回值,也可能不返回值)。您的 'asyncWebRequestAndConcurrentCollectionUpdater' 采用 type 的参数TestArgs

但它不能编译的事实并不是主要问题。这很容易解决。(例如,将其更改为Task.Factory.StartNew(() => asyncWebRequestAndConcurrentCollectionUpdater(args));)真正的问题是您正在做的事情有点奇怪:您正在使用Task.StartNew调用一个已经返回Task.

Task.StartNew是一种采用同步方法(即返回 a 的方法Task)并以非阻塞方式运行它的便捷方式。(它将在线程池上运行。)但是,如果您有一个已经返回 a 的方法Task,那么您实际上不需要使用Task.StartNew. 如果我们查看Task.StartNew返回的内容(一旦您修复了编译错误),奇怪会变得更加明显:

Task<Task> t = Task.Factory.StartNew(
    () => asyncWebRequestAndConcurrentCollectionUpdater(args));

Task<Task>揭示了正在发生的事情。您已决定使用通常用于使非异步方法异步的机制来包装已经异步的方法。所以你现在有了一个Task产生Task.

稍微令人惊讶的结果之一是,如果您要等待返回的任务StartNew完成,则不一定会完成基础工作:

t.Wait(); // doesn't wait for asyncWebRequestAndConcurrentCollectionUpdater to finish!

实际上要做的就是等待asyncWebRequestAndConcurrentCollectionUpdater返回一个Task. 而且由于asyncWebRequestAndConcurrentCollectionUpdater已经是一个异步方法,它会或多或少地立即返回一个任务。(具体来说,它会在执行一个await不会立即完成的任务时返回一个任务。)

如果你想等待你开始的工作完成,你需要这样做:

t.Result.Wait();

或者,可能更有效的是:

t.Unwrap().Wait();

那就是说:让我得到我的Task异步方法返回的那个,然后等待那个。这可能与这个更简单的代码没有什么不同:

Task t = asyncWebRequestAndConcurrentCollectionUpdater("foo");
... maybe queue up some other tasks ...
t.Wait();

通过引入 `Task.Factory.StartNew',您可能没有获得任何有用的信息。

我说“可能”是因为有一个重要的限定条件:这取决于你开始工作的环境。C# 生成的代码默认情况下会尝试确保当async方法在 之后继续时await,它会在await最初执行的相同上下文中执行此操作。例如,如果您在 WPF 应用程序中并且您await在 UI 线程上,那么当代码继续执行时,它将安排在 UI 线程上执行此操作。(您可以使用 禁用此功能ConfigureAwait。)

因此,如果您处于上下文基本上是序列化的情况(因为它是单线程的,就像在 GUI 应用程序中的情况一样,或者因为它使用类似于租赁模型的东西,例如特定 ASP 的上下文.NET 请求),通过它启动异步任务实际上可能很有用,Task.Factory.StartNew因为它使您能够转义原始上下文。然而,你只是让你的生活变得更加艰难——跟踪你的任务直到完成有点复杂。并且您可能已经能够通过ConfigureAwait在您的async方法中简单地使用来实现相同的效果。

无论如何,这可能无关紧要 - 如果您只尝试每秒管理 20 个请求,那么执行此操作所需的最少 CPU 工作量意味着您可能可以在一个线程上完全充分地管理它。(另外,如果这是一个控制台应用程序,默认上下文将发挥作用,它使用线程池,因此您的任务将能够在任何情况下运行多线程。)

但回到你的问题,对我来说,有一个单身似乎是完全合理的async从队列中挑选一个 url,发出请求,检查响应,并在必要时向错误 url 集合添加一个条目的方法。并且从计时器开始处理似乎也是合理的——这将限制尝试连接的速度,而不会因响应缓慢而陷入困境(例如,如果大量请求最终试图与离线的服务器通信)。如果您遇到一些病态的情况,即您最终连续数以万计的 URL 都指向没有响应的服务器,则可能有必要为最大请求数设置一个上限。(在相关说明中,您需要确保您使用的任何 HTTP API 都不会达到任何每个客户端的连接限制 - 这可能最终会限制有效吞吐量。)

您将需要添加某种完成处理 - 仅启动异步操作而不做任何事情来处理结果是不好的做法,因为您最终可能会遇到无处可去的异常。(在 .NET 4.0 中,这些用于终止您的进程,但从 .NET 4.5 开始,默认情况下,来自异步操作的未处理异常将被简单地忽略!)如果您最终决定值得通过启动它,Task.Factory.StartNew请记住您' 最终得到了一层额外的包装,所以你需要做一些事情myTask.Unwrap().ContinueWith(...)来正确处理它。

于 2013-11-29T10:42:06.467 回答
0

当然可以。并发集合被称为“并发”,因为它们可以被多个线程同时使用,并对其行为有一些保证。

ConcurrentQueue 将确保插入其中的每个元素仅被提取一次(并发线程永远不会错误地提取相同的项目,并且一旦队列为空,则所有项目都已被线程提取)。

编辑:唯一可能出错的是 50 毫秒不足以完成请求,因此越来越多的任务累积在任务队列中。如果发生这种情况,你的记忆可能会被填满,但无论如何它都会起作用。所以是的,这是可行的。

无论如何,我想强调一个任务不是线程的事实。即使您创建了 100 个任务,框架也会决定其中多少个任务将实际并发执行。

如果你想对并行度有更多的控制,你应该使用异步请求。在您的评论中,您写了“异步 Web 请求”,但我不明白您写异步是因为它在不同的线程上还是因为您打算使用异步 API。如果您使用的是异步 API,我希望看到一些附加到完成事件的处理程序,但我看不到它,所以我假设您正在使用从异步任务发出的同步请求。如果您使用的是异步请求,那么使用任务是没有意义的,只需使用计时器来发出异步请求,因为它们已经是异步的。

当我说“异步请求”时,我指的是 WebRequest.GetResponseAsync 和 WebRequest.BeginGetResponse 等方法。

EDIT2:如果你想使用异步请求,那么你可以从定时器处理程序发出请求。该BeginGetResponse方法有两个参数。第一个是回调过程,将被调用以报告请求的状态。您可以为所有请求传递相同的过程。第二个是用户提供的对象,它将存储有关请求的状态,您可以使用此参数来区分不同的请求。你甚至可以在没有计时器的情况下做到这一点。就像是:

private readonly int desiredConcurrency = 20;

struct RequestData
{
  public UrlInfo url;
  public HttpWebRequest request;
}

/// Handles the completion of an asynchronous request
/// When a request has been completed,
/// tries to issue a new request to another url.
private void AsyncRequestHandler(IAsyncResult ar)
{
  if (ar.IsCompleted)
  {
    RequestData data = (RequestData)ar.AsyncState;
    HttpWebResponse resp = data.request.EndGetResponse(ar);
    if (resp.StatusCode != 200)
    {
      BadUrls.Add(data.url);
    }

    //A request has been completed, try to start a new one
    TryIssueRequest();
  }
}

/// If urls is not empty, dequeues a url from it
/// and issues a new request to the extracted url.
private bool TryIssueRequest()
{
  RequestData rd;
  if (urls.TryDequeue(out rd.url))
  {
    rd.request = CreateRequestTo(rd.url); //TODO implement
    rd.request.BeginGetResponse(AsyncRequestHandler, rd);
    return true;
  }
  else
  {
    return false;
  }
}

//Called by a button handler, or something like that
void StartTheRequests()
{
  for (int requestCount = 0; requestCount < desiredConcurrency; ++requestCount)
  {
    if (!TryIssueRequest()) break;
  }
}
于 2013-08-05T14:27:31.613 回答