这有点晚了,但希望这个示例代码能帮助其他处于类似位置的人!
正如 osgx 所提到的,OpenMP 对信号问题保持沉默,但由于 OpenMP 通常在 POSIX 系统上使用 pthread 实现,我们可以使用 pthread 信号方法。
对于使用 OpenMP 的繁重计算,很可能只有少数几个位置可以安全地停止计算。因此,对于想要获得过早结果的情况,我们可以使用同步信号处理来安全地执行此操作。另一个优点是这让我们可以接受来自特定 OpenMP 线程的信号(在下面的示例代码中,我们选择主线程)。捕捉到信号后,我们只需设置一个标志,指示计算应该停止。然后,每个线程应确保在方便时定期检查此标志,然后结束其工作负载份额。
通过使用这种同步方法,我们允许计算优雅地退出并且对算法的更改非常小。另一方面,所需的信号处理程序方法可能不合适,因为可能难以将每个线程的当前工作状态整理成连贯的结果。但是,同步方法的一个缺点是计算可能需要相当长的时间才能停止。
信号检测仪由三部分组成:
- 阻断相关信号。这应该在
omp parallel
区域之外完成,以便每个 OpenMP 线程 (pthread) 都将继承相同的阻塞行为。
- 从主线程轮询所需的信号。可以使用
sigtimedwait
此功能,但某些系统(例如 MacOS)不支持此功能。更便携,我们可以sigpending
用来轮询任何阻塞的信号,然后在同步接受它们之前仔细检查阻塞的信号是我们所期望的sigwait
(应该立即返回这里,除非程序的其他部分正在创建竞争条件)。我们终于设置了相关标志。
- 我们应该在最后移除我们的信号掩码(可以选择最后一次检查信号)。
有一些重要的性能注意事项和注意事项:
- 假设每个内部循环迭代都很小,执行信号检查系统调用的成本很高。在示例代码中,我们仅每 1000 万次(每线程)迭代检查一次信号,这可能对应于几秒钟的挂壁时间。
omp for
循环不能脱离1,因此您必须为剩余的迭代旋转或使用更基本的 OpenMP 原语重写循环。常规循环(例如外部并行循环的内部循环)可以很好地分解。
- 如果只有主线程可以检查信号,那么这可能会在主线程在其他线程之前完成的程序中产生问题。在这种情况下,这些其他线程将是不间断的。为了解决这个问题,您可以在每个线程完成其工作负载时“传递信号检查的指挥棒”,或者可以强制主线程继续运行和轮询,直到所有其他线程完成2。
- 在某些架构(例如 NUMA HPC)上,检查“全局”信号标志的时间可能非常昂贵,因此在决定何时何地检查或操作标志时要小心。例如,对于自旋循环部分,可能希望在标志变为真时在本地缓存该标志。
这是示例代码:
#include <signal.h>
void calculate() {
_Bool signalled = false;
int sigcaught;
size_t steps_tot = 0;
// block signals of interest (SIGINT and SIGTERM here)
sigset_t oldmask, newmask, sigpend;
sigemptyset(&newmask);
sigaddset(&newmask, SIGINT);
sigaddset(&newmask, SIGTERM);
sigprocmask(SIG_BLOCK, &newmask, &oldmask);
#pragma omp parallel
{
int rank = omp_get_thread_num();
size_t steps = 0;
// keep improving result forever, unless signalled
while (!signalled) {
#pragma omp for
for (size_t i = 0; i < 10000; i++) {
// we can't break from an omp for loop...
// instead, spin away the rest of the iterations
if (signalled) continue;
for (size_t j = 0; j < 1000000; j++, steps++) {
// ***
// heavy computation...
// ***
// check for signal every 10 million steps
if (steps % 10000000 == 0) {
// master thread; poll for signal
if (rank == 0) {
sigpending(&sigpend);
if (sigismember(&sigpend, SIGINT) || sigismember(&sigpend, SIGTERM)) {
if (sigwait(&newmask, &sigcaught) == 0) {
printf("Interrupted by %d...\n", sigcaught);
signalled = true;
}
}
}
// all threads; stop computing
if (signalled) break;
}
}
}
}
#pragma omp atomic
steps_tot += steps;
}
printf("The result is ... after %zu steps\n", steps_tot);
// optional cleanup
sigprocmask(SIG_SETMASK, &oldmask, NULL);
}
如果使用 C++,您可能会发现以下类很有用...
#include <signal.h>
#include <vector>
class Unterminable {
sigset_t oldmask, newmask;
std::vector<int> signals;
public:
Unterminable(std::vector<int> signals) : signals(signals) {
sigemptyset(&newmask);
for (int signal : signals)
sigaddset(&newmask, signal);
sigprocmask(SIG_BLOCK, &newmask, &oldmask);
}
Unterminable() : Unterminable({SIGINT, SIGTERM}) {}
// this can be made more efficient by using sigandset,
// but sigandset is not particularly portable
int poll() {
sigset_t sigpend;
sigpending(&sigpend);
for (int signal : signals) {
if (sigismember(&sigpend, signal)) {
int sigret;
if (sigwait(&newmask, &sigret) == 0)
return sigret;
break;
}
}
return -1;
}
~Unterminable() {
sigprocmask(SIG_SETMASK, &oldmask, NULL);
}
};
的阻塞部分calculate()
可以替换为Unterminable unterm();
,而信号检查部分可以替换为if ((sigcaught = unterm.poll()) > 0) {...}
。unterm
超出范围时会自动执行信号解除阻塞。
1这并不完全正确。OpenMP 支持以取消点的形式执行“并行中断”的有限支持。如果您选择在并行循环中使用取消点,请确保您确切知道隐式取消点的位置,以确保您的计算数据在取消时保持一致。
2就我个人而言,我会记录完成 for 循环的线程数,如果主线程完成循环而没有捕获到信号,它会一直轮询信号,直到它捕获到信号或所有线程都完成循环。为此,请确保标记 for 循环nowait
。