21

我想产生线程来执行某些任务,并使用线程安全队列与它们通信。我还想在等待时对各种文件描述符进行 IO。

完成此任务的推荐方法是什么?当队列从无元素变为某些元素时,我是否必须创建一个线程间管道并写入它?没有更好的方法吗?

如果我必须创建线程间管道,为什么没有更多实现共享队列的库允许您将共享队列和线程间管道创建为单个实体?

我想这样做是否意味着根本的设计缺陷?

我在问这个关于 C++ 和 Python 的问题。我对跨平台解决方案有点兴趣,但主要对 Linux 感兴趣。

举一个更具体的例子......

我有一些代码将在文件系统树中搜索东西。我有几个通过套接字向外界开放的通信渠道。可能(或可能不会)导致需要在文件系统树中搜索内容的请求将会到达。

我将在一个或多个线程中隔离在文件系统树中搜索内容的代码。我想接受导致需要搜索树的请求,并将它们放入线程安全队列中,由搜索线程完成。结果将被放入已完成搜索的队列中。

我希望能够在搜索进行时快速处理所有非搜索请求。我希望能够及时对搜索结果采取行动。

服务传入的请求通常意味着某种事件驱动的架构,它使用epoll. 磁盘搜索请求队列和结果返回队列意味着使用互斥锁或信号量来实现线程安全的线程安全队列。

等待空队列的标准方法是使用条件变量。但是,如果我在等待时需要服务其他请求,那将不起作用。要么我最终一直轮询结果队列(平均将结果延迟轮询间隔的一半),阻塞而不为请求提供服务。

4

8 回答 8

11

每当使用事件驱动架构时,都需要一种机制来报告事件完成。在 Linux 上,如果使用文件,则需要使用 select 或 poll 系列中的某些东西,这意味着使用管道来启动所有与文件无关的事件。

编辑:Linux 有eventfdtimerfd。这些可以添加到您的列表中,并用于分别在从另一个线程或计时器事件触发时epoll突破。epoll_wait

还有另一种选择,那就是信号。可以使用fcntl修改文件描述符,以便在文件描述符变为活动状态时发出信号。然后,信号处理程序可以将文件就绪消息推送到您选择的任何类型的队列中。这可能是一个简单的信号量或 mutex/condvar 驱动的队列。由于现在不再使用select/ poll,因此不再需要使用管道来对不基于文件的消息进行排队。

健康警告:我还没有尝试过,虽然我不明白为什么它不起作用,但我真的不知道这种signal方法对性能的影响。

编辑:在信号处理程序中操作互斥锁可能是一个非常糟糕的主意。

于 2011-04-02T18:43:30.523 回答
5

我已经使用您提到的 pipe() 和 libevent(包装了 epoll)解决了这个确切的问题。当工作线程的输出队列从空变为非空时,工作线程将一个字节写入其管道 FD。这会唤醒主 IO 线程,然后它可以获取工作线程的输出。这很好用,实际上代码非常简单。

于 2011-04-02T18:20:16.353 回答
4

你有 Linux 标签,所以我将把它扔掉:POSIX 消息队列完成所有这些,如果不是你不太想要的跨平台愿望,它应该满足你的“内置”请求。

线程安全同步是内置的。您可以让您的工作线程在读取队列时阻塞。或者,当队列中有新项目时,MQ 可以使用 mq_notify() 来生成新线程(或发出现有线程的信号)。由于看起来您将要使用 select(),MQ 的标识符 (mqd_t) 可以用作 select 的文件描述符。

于 2011-04-02T18:13:05.543 回答
3

在我看来,Duck 和 twk 的答案实际上比 doron 的(由 OP 选择的那个)更好。doron 建议从信号处理程序的上下文中写入消息队列,并声明消息队列可以是“任何类型的队列”。我强烈警告您不要这样做,因为许多 C 库/系统调用无法从信号处理程序中安全地调用(请参阅async-signal-safe)。

特别是,如果您选择受互斥体保护的队列,则不应从信号处理程序访问它。考虑这种情况:您的消费者线程锁定队列以读取它。紧接着,内核传递信号通知您文件描述符现在有数据。您的信号处理程序必然在消费者线程中运行),并尝试将某些内容放入您的队列中。为此,它首先必须获得锁。但它已经持有锁,所以你现在陷入僵局。

根据我的经验,select/poll 是 UNIX/Linux 中事件驱动程序的唯一可行解决方案。我希望在多线程程序中有更好的方法,但是你需要一些机制来“唤醒”你的消费者线程。我还没有找到不涉及系统调用的方法(因为在任何阻塞调用(如 select)期间,消费者线程位于内核内部的等待队列中)。

编辑:我忘了提到使用 select/poll 时处理信号的一种特定于 Linux 的方法:signalfd(2)。您将获得一个可以选择/轮询的文件描述符,并且您处理的代码正常运行,而不是在信号处理程序的上下文中。

于 2012-02-26T15:19:48.650 回答
3

似乎还没有人提到这个选项:

不要运行select/ poll/等。在你的“主线程”中。当 I/O 操作完成时,启动一个专用的辅助线程来执行 I/O 并将通知推送到您的线程安全队列(您的其他线程用于与主线程通信的同一队列)。

然后你的主线程只需要等待通知队列。

于 2016-04-26T04:02:56.527 回答
2

这是一个非常常见的问题,尤其是在您开发网络服务器端程序时。大多数 Linux 服务器端程序的主要外观会像这样循环:

epoll_add(serv_sock);
while(1){
    ret = epoll_wait();
    foreach(ret as fd){
        req = fd.read();
        resp = proc(req);
        fd.send(resp);
    }
}

它是单线程(主线程),基于 epoll 的服务器框架。问题是,它是单线程的,而不是多线程的。它要求 proc() 永远不要阻塞或运行很长一段时间(比如常见情况下为 10 毫秒)。

如果 proc() 会运行很长时间,我们需要多线程,并在单独的线程(工作线程)中执行 proc()。

我们可以在不阻塞主线程的情况下向工作线程提交任务,使用基于互斥锁的消息队列,速度足够快。

epoll_add(serv_sock);
while(1){
    ret = epoll_wait();
    foreach(ret as fd){
        req = fd.read();
        queue.add_job(req); // fast, non blockable
    }
}

然后我们需要一种从工作线程中获取任务结果的方法。如何?如果我们只是直接检查消息队列,在 epoll_wait() 之前或之后。

epoll_add(serv_sock);
while(1){
    ret = epoll_wait(); // may blocks for 10ms
    resp = queue.check_result(); // fast, non blockable
    foreach(ret as fd){
        req = fd.read();
        queue.add_job(req); // fast, non blockable
    }
}

但是,检查动作将在 epoll_wait() 结束后执行,如果 epoll_wait() 等待的所有文件描述符都未处于活动状态,通常会阻塞 10 微秒(常见情况)。

对于服务器来说,10 毫秒是相当长的时间!当任务结果生成时,我们可以用信号 epoll_wait() 立即结束吗?

是的!我将在我的一个开源项目中描述它是如何完成的:

为所有工作线程创建一个管道,并且 epoll 也在该管道上等待。一旦生成任务结果,工作线程将一个字节写入管道,然后 epoll_wait() 将几乎同时结束!- Linux 管道有 5 us 到 20 us 的延迟。


在我的项目SSDB(与 Redis 协议兼容的磁盘内 NoSQL 数据库)中,我创建了一个 SelectableQueue 用于在主线程和工作线程之间传递消息。就像它的名字一样,SelectableQueue 有一个文件描述符,可以通过 epoll 等待。

可选队列:https ://github.com/ideawu/ssdb/blob/master/src/util/thread.h#L94

主线程中的用法:

epoll_add(serv_sock);
epoll_add(queue->fd());
while(1){
    ret = epoll_wait();
    foreach(ret as fd){
        if(fd is queue){
            sock, resp = queue->pop_result();
            sock.send(resp);
        }
        if(fd is client_socket){
            req = fd.read();
            queue->add_task(fd, req);
        }
    }
}

工作线程中的用法:

fd, req = queue->pop_task();
resp = proc(req);
queue->add_result(fd, resp);
于 2017-09-14T10:38:47.193 回答
1

C++11 有 std::mutex 和 std::condition_variable。当满足某个条件时,这两者可用于让一个线程向另一个线程发出信号。在我看来,您需要从这些原语中构建您的解决方案。如果您的环境还不支持这些 C++11 库功能,您可以在 boost 中找到非常相似的功能。抱歉,关于 python 不能说太多。

于 2011-04-02T18:06:48.680 回答
0

完成您想要做的事情的一种方法是实现观察者模式

您可以将您的主线程注册为所有衍生线程的观察者,并让他们在完成应做的事情时通知它(或在运行期间使用您需要的信息进行更新)。

基本上,您希望将方法更改为事件驱动模型。

于 2011-04-02T18:05:17.727 回答