1

在我的 C++ 程序中,我使用该lio_listio调用一次发送许多(最多几百个)写请求。之后,我做了一些计算,当我完成后,我需要等待所有未完成的请求完成,然后才能提交下一批请求。我怎样才能做到这一点?

现在,我只是aio_suspend循环调用,每次调用一个请求,但这看起来很难看。看起来我应该使用struct sigevent *sevp参数来lio_listio. 我目前的猜测是我应该做这样的事情:

  • 在主线程中,创建一个互斥锁并在调用lio_listio.
  • 在对 的调用中lio_listio,指定解锁此互斥锁的通知函数/信号处理程序。

这应该给我想要的行为,但它会可靠地工作吗?是否允许从信号处理程序上下文中操作互斥锁?我读到 pthread 互斥锁可以提供错误检测,并且如果您尝试从同一个线程再次锁定它们或从不同的线程解锁它们,则此解决方案依赖于死锁。

示例代码,使用信号处理程序:

void notify(int, siginfo_t *info, void *) {
    pthread_mutex_unlock((pthread_mutex_t *) info->si_value);
}

void output() {
    pthread_mutex_t iomutex = PTHREAD_MUTEX_INITIALIZER;

    struct sigaction act;
    memset(&act, 0, sizeof(struct sigaction));
    act.sa_sigaction = &notify;
    act.sa_flags = SA_SIGINFO;
    sigaction(SIGUSR1, &act, NULL);

    for (...) {
        pthread_mutex_lock(&iomutex);

        // do some calculations here...

        struct aiocb *cblist[];
        int cbno;
        // set up the aio request list - omitted

        struct sigevent sev;
        memset(&sev, 0, sizeof(struct sigevent));
        sev.sigev_notify = SIGEV_SIGNAL;
        sev.sigev_signo = SIGUSR1;
        sev.sigev_value.sival_ptr = &iomutex;

        lio_listio(LIO_NOWAIT, cblist, cbno, &sev);
    }

    // ensure that the last queued operation completes
    // before this function returns
    pthread_mutex_lock(&iomutex);
    pthread_mutex_unlock(&iomutex);
}

示例代码,使用通知功能 - 可能效率较低,因为创建了一个额外的线程:

void output() {
    pthread_mutex_t iomutex = PTHREAD_MUTEX_INITIALIZER;

    for (...) {
        pthread_mutex_lock(&iomutex);

        // do some calculations here...

        struct aiocb *cblist[];
        int cbno;
        // set up the aio request list - omitted

        struct sigevent sev;
        memset(&sev, 0, sizeof(struct sigevent));
        sev.sigev_notify = SIGEV_THREAD;
        sev_sigev_notify_function = &pthread_mutex_unlock;
        sev.sigev_value.sival_ptr = &iomutex;

        lio_listio(LIO_NOWAIT, cblist, cbno, &sev);
    }

    // ensure that the last queued operation completes
    // before this function returns
    pthread_mutex_lock(&iomutex);
    pthread_mutex_unlock(&iomutex);
}
4

1 回答 1

3

如果您在 lio_listio() 调用中设置 sigevent 参数,则当该特定调用中的所有作业完成时,您将收到一个信号(或函数调用)通知。您仍然需要:

  1. 等到您收到与发出 lio_listio() 调用一样多的通知时,才能知道它们何时完成。

  2. 使用某种安全机制从信号处理程序与主线程进行通信,可能通过全局变量(可移植)。

如果您使用的是 linux,我建议您将 eventfd 绑定到您的 sigevent 并等待。这更加灵活,因为您不需要涉及信号处理程序。在 BSD(但不是 Mac OS)上,您可以使用 kqueue 在 aiocbs 上等待,而在 solaris/illumos 上,您可以使用端口来获取 aiocb 完成的通知。

以下是如何在 linux 上使用 eventfds的示例:

附带说明一下,在使用 lio_listio 发布作业时我会谨慎行事。您不能保证它支持接受超过2 个工作,并且某些系统对您一次可以发布的数量的限制非常低。例如,Mac OS 上的默认值为 16。此限制可以定义为 AIO_LISTIO_MAX 宏,但不一定。在这种情况下,您需要调用 sysconf(_SC_AIO_LISTIO_MAX) (请参阅文档)。有关详细信息,请参阅lio_listio 文档

您至少应该检查 lio_listio() 调用中的错误情况。

此外,您使用互斥锁的解决方案是次优的,因为您将同步 for 循环中的每个循环,并且一次只运行一个(除非它是一个递归互斥锁,但在这种情况下,如果您的信号,它的状态可能会损坏处理程序恰好落在不同的线程上)。

更合适的原语可能是信号量,它在处理程序中释放,然后(在您的 for 循环之后)获取与您循环相同的次数,调用 lio_listio()。但是,如果可以特定于 linux,我仍然会推荐一个 eventfd。

于 2013-02-10T18:44:37.160 回答