2

我想用多个处理器并行化线性运算(将复杂的数学函数拟合到某些数据集)。

假设我的机器中有 8 个内核,并且我想容纳 1000 个数据集。我期望的是一些系统将 1000 个数据集作为一个队列,并将它们发送到 8 个核心进行处理,因此它首先将 1000 个中的前 8 个作为 FIFO。每个数据集的拟合时间通常与另一个不同,因此拟合的 8 个数据集中的一些可能需要比其他数据集更长的时间。我想从系统中保存拟合数据集的结果,然后为每个已完成的线程从大队列(1000 个数据集)中继续获取新数据集。这必须继续,直到处理完整个 1000 个数据集。然后我可以继续我的程序。

这样的系统叫什么?并且在 C++ 上有模型吗?

我与 OpenMP 并行化,并使用模板和多态等高级 C++ 技术。

感谢您的任何努力。

4

2 回答 2

1

您基本上想要的是一个工作池(或线程池),它们从队列中获取作业,处理它,然后继续执行另一项作业。OpenMP 提供了不同的方法来处理这些任务,例如障碍(所有工作人员运行到某个点,并且仅在满足某个要求时才继续)或在工作人员设法计算其各自部分后将值累积到全局变量中的缩减。

您的问题非常广泛,但我可以给您的另一个提示是查看 MapReduce 范例。在这个范例中,一个函数被映射到一个数据集上,结果被排序到使用另一个函数(可能再次是相同的函数)减少的桶中。在您的情况下,这意味着您的每个处理器/核心/节点将给定函数映射到其分配的数据集,并将结果桶发送到另一个负责组合它的节点。我想如果你想在 C++ 中使用 MapReduce 并且不使用特定的 MapReduce 框架,你必须研究 MPI。当您在一个节点上运行程序时,也许您可​​以使用 OpenMP 执行类似的操作,因此在网络上搜索可能会有所帮助。

TL;DR搜索工作线程池)、障碍MapReduce

于 2012-09-27T11:35:51.217 回答
1

您可以将 OpenMP 并行用于动态计划或 OpenMP 任务。两者都可以用于并行化每次迭代需要不同时间来完成的情况。动态安排:

#pragma omp parallel
{
   Fitter fitter;
   fitter.init();
   #pragma omp for schedule(dynamic,1)
   for (int i = 0; i < numFits; i++)
      fitter.fit(..., &results[i]);
}

schedule(dynamic,1)使每个线程一次执行一次迭代,并且线程永远不会闲置,除非没有更多的迭代要处理。

有任务:

#pragma omp parallel
{
   Fitter fitter;
   fitter.init();
   #pragma omp single
   for (int i = 0; i < numFits; i++)
   {
      #pragma omp task
      fitter.fit(..., &results[i]);
   }
   #pragma omp taskwait
   // ^^^ only necessary if more code before the end of the parallel region
}

这里有一个线程运行一个 for 循环,产生 1000 个 OpenMP 任务。OMP 任务保存在队列中并由空闲线程处理。它的工作方式有点类似于动态 for 循环,但在代码结构中允许更大的自由度(例如,对于可以并行递归算法的任务)。该taskwait构造等待所有待处理的任务完成。它隐含在并行区域的末尾,因此只有在并行区域结束之前有更多代码时才真正有必要。

在这两种情况下,每次调用都fit()将在不同的线程中完成。您必须确保拟合一组参数不会影响拟合其他组,例如,这fit()是一种线程安全的方法/函数。这两种情况还要求执行时间fit()远高于 OpenMP 构造的开销。

OpenMP 任务需要符合 OpenMP 3.0 的编译器。如果您碰巧在 Windows 上开发,这排除了所有版本的 MS VC++(甚至是 VS2012 中的版本)。

如果您希望每个线程只初始化一个 fitter 实例,那么您应该采取一些不同的方法,例如使 fitter 对象成为全局对象并且threadprivate

#include <omp.h>

Fitter fitter;
#pragma omp threadprivate(fitter)

...

int main()
{
   // Disable dynamic teams
   omp_set_dynamic(0);

   // Initialise all fitters once per thread
   #pragma omp parallel
   {
      fitter.init();
   }

   ...

   #pragma omp parallel
   {
      #pragma omp for schedule(dynamic,1)
      for (int i = 0; i < numFits; i++)
         fitter.fit(..., &results[i]);
   }

   ...

   return 0;
 }

fitterFitter该类的全局实例。该omp threadprivate指令指示编译器将其放入线程本地存储中,例如使其成为每个线程的全局变量。这些在不同的平行区域之间持续存在。您也可以omp threadprivatestatic局部变量上使用。这些也在不同的并行区域之间持续存在(但仅在相同的函数中):

#include <omp.h>

int main()
{
   // Disable dynamic teams
   omp_set_dynamic(0);

   static Fitter fitter; // must be static
   #pragma omp threadprivate(fitter)

   // Initialise all fitters once per thread
   #pragma omp parallel
   {
      fitter.init();
   }

   ...

   #pragma omp parallel
   {
      #pragma omp for schedule(dynamic,1)
      for (int i = 0; i < numFits; i++)
         fitter.fit(..., &results[i]);
   }

   ...

   return 0;
 }

该调用禁用动态团队,即每个并行区域将始终使用环境变量omp_set_dynamic(0)指定的线程数执行。OMP_NUM_THREADS

于 2012-09-27T12:23:40.527 回答