1

我正在尝试对列表和列表中的每个项目使用 Parallel.ForEach,尝试进行数据库调用。我正在尝试记录每个项目是否有错误。只是想在这里与专家核对一下,如果我以正确的方式做事。对于此示例,我使用文件访问而不是数据库访问来模拟 I/O。

    static ConcurrentQueue<IdAndErrorMessage> queue = new ConcurrentQueue<IdAndErrorMessage>();
    private static void RunParallelForEach()
    {
      List<int> list = Enumerable.Range(1, 5).ToList<int>();
      Console.WriteLine("Start....");
      Stopwatch stopWatch = new Stopwatch();
      stopWatch.Start();
      Parallel.ForEach(list, (tempId) =>
      {
        string errorMessage = string.Empty;
        try
        {
          ComputeBoundOperationTest(tempId);
           try
           {
              Task[] task = new Task[1]
              {
               Task.Factory.StartNew(() =>  this.contentFactory.ContentFileUpdate(content, fileId))
              };
           }
           catch (Exception ex)
           {
              this.tableContentFileConversionInfoQueue.Enqueue(new ContentFileConversionInfo(fileId, ex.ToString()));
           }
        }
        catch (Exception ex)
        {
          errorMessage = ex.ToString();
        }
        if (queue.SingleOrDefault((IdAndErrorMessageObj) => IdAndErrorMessageObj.Id == tempId) == null)
        {
            queue.Enqueue(new IdAndErrorMessage(tempId, errorMessage));
        }
     }
     );
     Console.WriteLine("Stop....");
     Console.WriteLine("Total milliseconds :- " + stopWatch.ElapsedMilliseconds.ToString());
}

以下是辅助方法:-

private static byte[] FileAccess(int id)
{
    if (id == 5)
    {
      throw new ApplicationException("This is some file access exception");
    }
     return File.ReadAllBytes(Directory.GetFiles(Environment.SystemDirectory).First());
            //return File.ReadAllBytes("Files/" + fileName + ".docx");
}

 private static void ComputeBoundOperationTest(int tempId)
 {
    //Console.WriteLine("Compute-bound operation started for :- " + tempId.ToString());
    if (tempId == 4)
    {
       throw new ApplicationException("Error thrown for id = 4 from compute-bound operation");
    }
    Thread.Sleep(20);
 }

 private static void EnumerateQueue(ConcurrentQueue<IdAndErrorMessage> queue)
 {
    Console.WriteLine("Enumerating the queue items :- ");
    foreach (var item in queue)
    {
      Console.WriteLine(item.Id.ToString() + (!string.IsNullOrWhiteSpace(item.ErrorMessage) ? item.ErrorMessage : "No error"));
    }
 }
4

3 回答 3

2

没有理由这样做:

/*Below task is I/O bound - so do this Async.*/
Task[] task = new Task[1]
{
    Task.Factory.StartNew(() => FileAccess(tempId))
};
Task.WaitAll(task);

通过将其安排在一个单独的任务中,然后立即等待它,您只是占用了更多线程。您最好将其保留为:

/*Below task is I/O bound - but just call it.*/
FileAccess(tempId);

话虽如此,鉴于您正在为每个项目创建一个记录值(异常或成功),您可能需要考虑将其写入一个方法,然后将整个事物作为 PLINQ 查询调用。

例如,如果您将其写入处理 try/catch(没有线程)的方法中,并返回“记录的字符串”,即:

string ProcessItem(int id) { // ...

您可以将整个操作编写为:

var results = theIDs.AsParallel().Select(id => ProcessItem(id));
于 2012-08-15T18:05:47.483 回答
1

You might want to remove Console.WriteLine from thread code. Reason being there can be only one console per windows app. So if two or more threads going to write parallel to console, one has to wait.

In replacement to your custom error queue you might want to see .NET 4's Aggregate Exception and catch that and process exceptions accordingly. The InnerExceptions propery will give you the necessary list of exceptions. More here

And a general code review comment, don't use magic numbers like 4 in if (tempId == 4) Instead have some const defined which tells what 4 stands for. e.g. if (tempId == Error.FileMissing)

于 2012-08-15T18:53:37.380 回答
0

Parallel.ForEach最多同时运行一个动作/函数到一定数量的同时实例。如果这些迭代中的每一个在本质上都不是相互独立的,那么您就不会获得任何性能提升。而且,可能会通过引入昂贵的上下文切换和争用来降低性能。你说你想做一个“数据库调用”并用一个文件操作来模拟它。如果每次迭代都使用相同的资源(例如,数据库表中的同一行;或尝试写入同一位置的同一文件),那么它们实际上不会并行运行。一次只会运行一个,其他的将只是“等待”获取资源 - 不必要地使您的代码变得复杂。

您还没有详细说明每次迭代要做什么;但是当我和其他程序员遇到这样的情况时,他们几乎总是不会真正并行地做事情,他们只是简单地经历并替换foreachs 以Parallel.ForEach希望神奇地获得性能或神奇地利用多 CPU /核心处理器。

于 2012-08-15T18:18:12.583 回答