8

I have a program which uses POSIX timer (timer_create()). Essentially the program sets a timer and starts performing some lengthy (potentially infinite) computation. When the timer expires and a signal handler is called, the handler prints the best result yet that has been computed and quits the program.

I consider doing the computation in parallel using OpenMP, because it should speed it up.

In pthreads, there are special functions for example for setting signal masks for my threads or so. Does OpenMP provide such control, or do I have to accept the fact that the signal can be delivered to any of the threads OpenMP creates?

Also, in case I am currently in a parallel section of my code and my handler is called, can it still safely kill the application (exit(0);) and do things like locking OpenMP locks?

4

2 回答 2

2

OpenMP 3.1 标准对信号只字未提。

据我所知,Linux/UNIX 上每个流行的 OpenMP 实现都是基于 pthread 的,所以 OpenMP 线程是 pthread 的线程。并且适用 pthread 和信号的通用规则。

OpenMP 是否提供这样的控制

没有任何特定的控制;但是你可以尝试使用 pthread 的控制。唯一的问题是要知道使用了多少 OpenMP 线程以及在哪里放置控制语句。

信号可以传递到 OpenMP 创建的任何线程吗?

默认情况下,是的,它将被传递到任何线程。

我的处理程序被调用,

关于信号处理程序的通常规则仍然适用。信号处理程序中允许的函数列在http://pubs.opengroup.org/onlinepubs/009695399/functions/xsh_chap02_04.html(在页面末尾)

并且printf不允许(write是)。如果您知道在信号发出时 printf 没有被任何线程使用(例如,您在并行区域中没有 printf),您可以使用 printf。

它还能安全地杀死应用程序吗(退出(0);)

是的,它可以:abort()并且_exit()被处理程序允许。

exitLinux/Unix 将在任何线程执行或时终止所有线程abort

并做诸如锁定 OpenMP 锁之类的事情?

您不应该这样做,但是如果您知道在信号处理程序运行时不会锁定此锁,您可以尝试这样做。

!!更新

有一个采用 OpenMP 信令的示例http://www.cs.colostate.edu/~cs675/OpenMPvsThreads.pdf(“OpenMP与 C/C++ 中的线程”)。简而言之:在处理程序中设置一个标志,并在每个第 N 次循环迭代的每个线程中添加对该标志的检查。

使基于信号的异常机制适应并行区域

C/C++ 应用程序比 Fortran 应用程序更常见的是程序使用复杂的用户界面。Genehunter 是一个简单的例子,用户可以通过按下 control-C 来中断一个家谱的计算,以便它可以继续到有关该疾病的临床数据库中的下一个家谱。过早终止在串行版本中由类似 C++ 的异常机制处理,涉及信号处理程序、setjump 和 longjump。OpenMP 不允许非结构化控制流跨越并行构造边界。我们通过将中断处理程序更改为轮询机制来修改 OpenMP 版本中的异常处理。捕获 control-C 信号的线程设置一个共享标志。所有线程通过调用例程 has_hit_interrupt( ) 在循环开始时检查标志,如果设置了则跳过迭代。循环结束,master检查flag,可以轻松执行longjump完成异常退出(见图1)

于 2011-11-16T18:44:59.310 回答
2

这有点晚了,但希望这个示例代码能帮助其他处于类似位置的人!


正如 osgx 所提到的,OpenMP 对信号问题保持沉默,但由于 OpenMP 通常在 POSIX 系统上使用 pthread 实现,我们可以使用 pthread 信号方法。

对于使用 OpenMP 的繁重计算,很可能只有少数几个位置可以安全地停止计算。因此,对于想要获得过早结果的情况,我们可以使用同步信号处理来安全地执行此操作。另一个优点是这让我们可以接受来自特定 OpenMP 线程的信号(在下面的示例代码中,我们选择主线程)。捕捉到信号后,我们只需设置一个标志,指示计算应该停止。然后,每个线程应确保在方便时定期检查此标志,然后结束其工作负载份额。

通过使用这种同步方法,我们允许计算优雅地退出并且对算法的更改非常小。另一方面,所需的信号处理程序方法可能不合适,因为可能难以将每个线程的当前工作状态整理成连贯的结果。但是,同步方法的一个缺点是计算可能需要相当长的时间才能停止。

信号检测仪由三部分组成:

  • 阻断相关信号。这应该在omp parallel区域之外完成,以便每个 OpenMP 线程 (pthread) 都将继承相同的阻塞行为。
  • 从主线程轮询所需的信号。可以使用sigtimedwait此功能,但某些系统(例如 MacOS)不支持此功能。更便携,我们可以sigpending用来轮询任何阻塞的信号,然后在同步接受它们之前仔细检查阻塞的信号是我们所期望的sigwait(应该立即返回这里,除非程序的其他部分正在创建竞争条件)。我们终于设置了相关标志。
  • 我们应该在最后移除我们的信号掩码(可以选择最后一次检查信号)。

有一些重要的性能注意事项和注意事项:

  • 假设每个内部循环迭代都很小,执行信号检查系统调用的成本很高。在示例代码中,我们仅每 1000 万次(每线程)迭代检查一次信号,这可能对应于几秒钟的挂壁时间。
  • omp for循环不能脱离1,因此您必须为剩余的迭代旋转或使用更基本的 OpenMP 原语重写循环。常规循环(例如外部并行循环的内部循环)可以很好地分解。
  • 如果只有主线程可以检查信号,那么这可能会在主线程在其他线程之前完成的程序中产生问题。在这种情况下,这些其他线程将是不间断的。为了解决这个问题,您可以在每个线程完成其工作负载时“传递信号检查的指挥棒”,或者可以强制主线程继续运行和轮询,直到所有其他线程完成2
  • 在某些架构(例如 NUMA HPC)上,检查“全局”信号标志的时间可能非常昂贵,因此在决定何时何地检查或操作标志时要小心。例如,对于自旋循环部分,可能希望在标志变为真时在本地缓存该标志。

这是示例代码:

#include <signal.h>

void calculate() {
    _Bool signalled = false;
    int sigcaught;
    size_t steps_tot = 0;

    // block signals of interest (SIGINT and SIGTERM here)
    sigset_t oldmask, newmask, sigpend;
    sigemptyset(&newmask);
    sigaddset(&newmask, SIGINT);
    sigaddset(&newmask, SIGTERM);
    sigprocmask(SIG_BLOCK, &newmask, &oldmask);

    #pragma omp parallel
    {
        int rank = omp_get_thread_num();
        size_t steps = 0;

        // keep improving result forever, unless signalled
        while (!signalled) {
            #pragma omp for
            for (size_t i = 0; i < 10000; i++) {
                // we can't break from an omp for loop...
                // instead, spin away the rest of the iterations
                if (signalled) continue;

                for (size_t j = 0; j < 1000000; j++, steps++) {
                    // ***
                    // heavy computation...
                    // ***

                    // check for signal every 10 million steps
                    if (steps % 10000000 == 0) {

                        // master thread; poll for signal
                        if (rank == 0) {
                            sigpending(&sigpend);
                            if (sigismember(&sigpend, SIGINT) || sigismember(&sigpend, SIGTERM)) {
                                if (sigwait(&newmask, &sigcaught) == 0) {
                                    printf("Interrupted by %d...\n", sigcaught);
                                    signalled = true;
                                }
                            }
                        }

                        // all threads; stop computing
                        if (signalled) break;
                    }
                }
            }
        }

        #pragma omp atomic
        steps_tot += steps;
    }

    printf("The result is ... after %zu steps\n", steps_tot);

    // optional cleanup
    sigprocmask(SIG_SETMASK, &oldmask, NULL);
}

如果使用 C++,您可能会发现以下类很有用...

#include <signal.h>
#include <vector>

class Unterminable {
    sigset_t oldmask, newmask;
    std::vector<int> signals;

public:
    Unterminable(std::vector<int> signals) : signals(signals) {
        sigemptyset(&newmask);
        for (int signal : signals)
            sigaddset(&newmask, signal);
        sigprocmask(SIG_BLOCK, &newmask, &oldmask);
    }

    Unterminable() : Unterminable({SIGINT, SIGTERM}) {}

    // this can be made more efficient by using sigandset,
    // but sigandset is not particularly portable
    int poll() {
        sigset_t sigpend;
        sigpending(&sigpend);
        for (int signal : signals) {
            if (sigismember(&sigpend, signal)) {
                int sigret;
                if (sigwait(&newmask, &sigret) == 0)
                    return sigret;
                break;
            }
        }
        return -1;
    }

    ~Unterminable() {
        sigprocmask(SIG_SETMASK, &oldmask, NULL);
    }
};

的阻塞部分calculate()可以替换为Unterminable unterm();,而信号检查部分可以替换为if ((sigcaught = unterm.poll()) > 0) {...}unterm超出范围时会自动执行信号解除阻塞。


1这并不完全正确。OpenMP 支持以取消点的形式执行“并行中断”的有限支持。如果您选择在并行循环中使用取消点,请确保您确切知道隐式取消点的位置,以确保您的计算数据在取消时保持一致。

2就我个人而言,我会记录完成 for 循环的线程数,如果主线程完成循环而没有捕获到信号,它会一直轮询信号,直到它捕获到信号或所有线程都完成循环。为此,请确保标记 for 循环nowait

于 2019-05-01T12:22:24.327 回答