0

假设我有一些要并行运行的任务(蒙特卡洛模拟)。我想完成给定数量的任务,但任务需要不同的时间,所以不容易在线程上平均分配工作。另外:我最终需要单个向量(或数组)中的所有模拟结果。

所以我想出了以下方法:

 int Max{1000000};
 //SimResult is some struct with well-defined default value.
 std::vector<SimResult> vec(/*length*/Max);//Initialize with default values of SimResult
 int LastAdded{0};
 void fill(int RandSeed)
 { 
      Simulator sim{RandSeed};
      while(LastAdded < Max)
      {
           // Do some work to bring foo to the desired state
           //The duration of this work is subject to randomness
           vec[LastAdded++] 
                 = sim.GetResult();//Produces SimResult. 
      }
 }
 main()
 { 
       //launch a bunch of std::async that start
       auto fut1 = std::async(fill,1);
       auto fut2 = std::async(fill,2);
       //maybe some more tasks.


      fut1.get();
      fut2.get();
      //do something with the results in vec. 
 }

我猜上面的代码会给出竞争条件。我正在寻找一种高效的方法来避免这种情况。要求:避免竞争条件(填满整个数组,没有跳过);最终结果立即在数组中;高性能。

阅读各种方法,似乎 atomic 是一个不错的选择,但我不确定在我的情况下哪些设置最有效?甚至不确定 atomic 是否会削减它;也许需要一个保护 LastAdded 的互斥锁?

4

2 回答 2

3

我要说的一件事是您需要非常小心标准库随机数函数。如果你的“模拟器”类创建了一个生成器的实例,你不应该使用同一个对象并行运行蒙特卡罗模拟,因为你可能会在运行之间得到重复的随机数模式,这会给你带来不准确的结果。

这方面的最佳实践是创建 N 个具有相同属性的 Simulator 对象,并为每个对象分配一个不同的随机种子。然后,您可以使用 OpenMP 将这些对象汇集到多个线程中,这是一种用于科学软件开发的常见并行编程模型。

std::vector<SimResult> generateResults(size_t N_runs, double seed) 
{
    std::vector<SimResult> results(N_runs);
    #pragma omp parallel for
    for(auto i = 0; i < N_runs; i++)
    {
        auto sim = Simulator(seed + i);
        results[i] = sim.GetResult();
    }
}

编辑:使用 OpenMP,您可以选择不同的调度模型,例如,您可以在线程之间动态拆分工作。你可以这样做:

#pragma omp parallel for schedule(dynamic, 16)

这将使每个线程一次处理 16 个项目的块。

于 2020-03-12T10:51:14.643 回答
2

由于您已经知道要处理多少个元素并且永远不会更改向量的大小,因此最简单的解决方案是让每个线程在它自己的向量部分上工作。例如

更新

为了适应巨大变化的计算时间,你应该保留你当前的代码,但避免通过std::lock_guard. 您将需要一个std::mutex对所有线程都相同的变量,例如一个全局变量,或者将互斥锁的引用传递给每个线程。

void fill(int RandSeed, std::mutex &nextItemMutex)
{ 
      Simulator sim{RandSeed};
      size_t workingIndex;
      while(true)
      {
          {
               // enter critical area
               std::lock_guard<std::mutex> nextItemLock(nextItemMutex);

               // Acquire next item
               if(LastAdded < Max)
               {
                   workingIndex = LastAdded;
                   LastAdded++;
               } 
               else 
               {
                   break;
               }
               // lock is released when nextItemLock goes out of scope
          }

           // Do some work to bring foo to the desired state
           // The duration of this work is subject to randomness
           vec[workingIndex] = sim.GetResult();//Produces SimResult. 
      }
 }

问题在于,同步非常昂贵。但与您运行的模拟相比,它可能并没有那么昂贵,所以它应该不会太糟糕。

版本 2:

为了减少所需的同步量,您可以获取要处理的块,而不是单个项目:

void fill(int RandSeed, std::mutex &nextItemMutex, size_t blockSize)
{ 
      Simulator sim{RandSeed};
      size_t workingIndex;
      while(true)
      {
          {
               std::lock_guard<std::mutex> nextItemLock(nextItemMutex);

               if(LastAdded < Max)
               {
                   workingIndex = LastAdded;
                   LastAdded += blockSize;
               } 
               else 
               {
                   break;
               }
          }
          
          for(size_t i = workingIndex; i < workingIndex + blockSize && i < MAX; i++)
              vec[i] = sim.GetResult();//Produces SimResult. 
      }
 }

简单版

void fill(int RandSeed, size_t partitionStart, size_t partitionEnd)
{ 
      Simulator sim{RandSeed};
      for(size_t i = partitionStart; i < partitionEnd; i++)
      {
           // Do some work to bring foo to the desired state
           // The duration of this work is subject to randomness
           vec[i] = sim.GetResult();//Produces SimResult. 
      }
 }
main()
{ 
    //launch a bunch of std::async that start
    auto fut1 = std::async(fill,1, 0, Max / 2);
    auto fut2 = std::async(fill,2, Max / 2, Max);

    // ...
}
于 2020-03-12T10:41:14.267 回答