17

如何将以下内容变成 Parallel.ForEach?

public async void getThreadContents(String[] threads)
{
    HttpClient client = new HttpClient();
    List<String> usernames = new List<String>();
    int i = 0;

    foreach (String url in threads)
    {
        i++;
        progressLabel.Text = "Scanning thread " + i.ToString() + "/" + threads.Count<String>();
        HttpResponseMessage response = await client.GetAsync(url);
        String content = await response.Content.ReadAsStringAsync();
        String user;
        Predicate<String> userPredicate;
        foreach (Match match in regex.Matches(content))
        {
            user = match.Groups[1].ToString();
            userPredicate = (String x) => x == user;
            if (usernames.Find(userPredicate) != user)
            {
                usernames.Add(match.Groups[1].ToString());
            }
        }
        progressBar1.PerformStep();
    }
}

我在假设异步和并行处理相同的情况下对其进行了编码,但我刚刚意识到事实并非如此。我查看了我能找到的所有问题,但我似乎真的找不到适合我的例子。他们中的大多数缺乏可读的变量名。使用不解释它们包含的内容的单字母变量名称是陈述示例的可怕方式。

我通常在名为线程的数组中有 300 到 2000 个条目(包含论坛线程的 URL),并且似乎并行处理(由于许多 HTTP 请求)会加快执行速度)。

在我可以使用 Parallel.ForEach 之前,我是否必须删除所有异步(我在 foreach 之外没有任何异步,只有变量定义)?我该怎么做呢?我可以在不阻塞主线程的情况下做到这一点吗?

顺便说一句,我正在使用 .NET 4.5。

4

4 回答 4

16

我在假设异步和并行处理是相同的情况下对其进行编码

异步处理和并行处理是完全不同的。如果您不了解其中的区别,我认为您应该先阅读更多相关内容(例如c# 中异步和并行编程之间的关系是什么?)。

现在,您想要做的实际上并不那么简单,因为您想要异步处理一个大集合,并具有特定程度的并行性 (8)。使用同步处理,您可以使用Parallel.ForEach()(以及ParallelOptions配置并行度),但没有简单的替代方法可以使用async.

在您的代码中,这很复杂,因为您希望所有内容都在 UI 线程上执行。(尽管理想情况下,您不应该直接从计算中访问 UI。相反,您应该使用IProgress,这意味着代码不再需要在 UI 线程上执行。)

在 .Net 4.5 中执行此操作的最佳方法可能是使用 TPL 数据流。它ActionBlock完全符合您的要求,但它可能非常冗长(因为它比您需要的更灵活)。所以创建一个辅助方法是有意义的:

public static Task AsyncParallelForEach<T>(
    IEnumerable<T> source, Func<T, Task> body,
    int maxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    TaskScheduler scheduler = null)
{
    var options = new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = maxDegreeOfParallelism
    };
    if (scheduler != null)
        options.TaskScheduler = scheduler;

    var block = new ActionBlock<T>(body, options);

    foreach (var item in source)
        block.Post(item);

    block.Complete();
    return block.Completion;
}

在你的情况下,你会像这样使用它:

await AsyncParallelForEach(
    threads, async url => await DownloadUrl(url), 8,
    TaskScheduler.FromCurrentSynchronizationContext());

这里,DownloadUrl()是一种async Task处理单个 URL(循环体)的方法,8是并行度(在实际代码中可能不应该是字面常量),并FromCurrentSynchronizationContext()确保代码在 UI 线程上执行。

于 2013-02-03T15:47:39.133 回答
10

Stephen Toub 有一篇关于实现ForEachAsync. Svick 的回答对于 Dataflow 可用的平台来说非常好。

这是一个替代方案,使用 TPL 中的分区程序:

public static Task ForEachAsync<T>(this IEnumerable<T> source,
    int degreeOfParallelism, Func<T, Task> body)
{
  var partitions = Partitioner.Create(source).GetPartitions(degreeOfParallelism);
  var tasks = partitions.Select(async partition =>
  {
    using (partition) 
      while (partition.MoveNext()) 
        await body(partition.Current); 
  });
  return Task.WhenAll(tasks);
}

然后,您可以这样使用它:

public async Task getThreadContentsAsync(String[] threads)
{
  HttpClient client = new HttpClient();
  ConcurrentDictionary<String, object> usernames = new ConcurrentDictionary<String, object>();

  await threads.ForEachAsync(8, async url =>
  {
    HttpResponseMessage response = await client.GetAsync(url);
    String content = await response.Content.ReadAsStringAsync();
    String user;
    foreach (Match match in regex.Matches(content))
    {
      user = match.Groups[1].ToString();
      usernames.TryAdd(user, null);
    }
    progressBar1.PerformStep();
  });
}
于 2013-02-03T22:54:01.147 回答
3

另一种选择是使用SemaphoreSlimor AsyncSemaphore(它包含在我的 AsyncEx 库中,并且支持比 更多的平台SemaphoreSlim):

public async Task getThreadContentsAsync(String[] threads)
{
  SemaphoreSlim semaphore = new SemaphoreSlim(8);
  HttpClient client = new HttpClient();
  ConcurrentDictionary<String, object> usernames = new ConcurrentDictionary<String, object>();

  await Task.WhenAll(threads.Select(async url =>
  {
    await semaphore.WaitAsync();
    try
    {
      HttpResponseMessage response = await client.GetAsync(url);
      String content = await response.Content.ReadAsStringAsync();
      String user;
      foreach (Match match in regex.Matches(content))
      {
        user = match.Groups[1].ToString();
        usernames.TryAdd(user, null);
      }
      progressBar1.PerformStep();
    }
    finally
    {
      semaphore.Release();
    }
  }));
}
于 2013-02-03T23:04:40.563 回答
0

您可以尝试AsyncEnumerator NuGet PackageParallelForEachAsync中的扩展方法:

using System.Collections.Async;

public async void getThreadContents(String[] threads)
{
    HttpClient client = new HttpClient();
    List<String> usernames = new List<String>();
    int i = 0;

    await threads.ParallelForEachAsync(async url =>
    {
        i++;
        progressLabel.Text = "Scanning thread " + i.ToString() + "/" + threads.Count<String>();
        HttpResponseMessage response = await client.GetAsync(url);
        String content = await response.Content.ReadAsStringAsync();
        String user;
        Predicate<String> userPredicate;
        foreach (Match match in regex.Matches(content))
        {
            user = match.Groups[1].ToString();
            userPredicate = (String x) => x == user;
            if (usernames.Find(userPredicate) != user)
            {
                usernames.Add(match.Groups[1].ToString());
            }
        }

        // THIS CALL MUST BE THREAD-SAFE!
        progressBar1.PerformStep();
    },
    maxDegreeOfParallelism: 8);
}
于 2016-08-26T21:25:01.627 回答