我碰巧有一个可以做到这一点的实现。我的做事方式是将std::packaged_task
对象包装在一个抽象出返回类型的结构中。将任务提交到线程池的方法返回结果的未来。
这种工作,但由于每个任务所需的内存分配,它不适合非常短且非常频繁的任务(我试图用它来并行化流体模拟的块并且开销太高了,在324 个任务的几毫秒数量级)。
关键部分是这个结构:
struct abstract_packaged_task
{
template <typename R>
abstract_packaged_task(std::packaged_task<R> &&task):
m_task((void*)(new std::packaged_task<R>(std::move(task)))),
m_call_exec([](abstract_packaged_task *instance)mutable{
(*(std::packaged_task<R>*)instance->m_task)();
}),
m_call_delete([](abstract_packaged_task *instance)mutable{
delete (std::packaged_task<R>*)(instance->m_task);
})
{
}
abstract_packaged_task(abstract_packaged_task &&other);
~abstract_packaged_task();
void operator()();
void *m_task;
std::function<void(abstract_packaged_task*)> m_call_exec;
std::function<void(abstract_packaged_task*)> m_call_delete;
};
如您所见,它通过使用带有std::function
和 a 的lambdas 隐藏了类型依赖关系void*
。如果您知道所有可能出现的std::packaged_task
对象的最大大小(我还没有检查大小是否完全依赖R
),您可以尝试通过删除内存分配来进一步优化它。
进入线程池的提交方法然后执行此操作:
template <typename R>
std::future<R> submit_task(std::packaged_task<R()> &&task)
{
assert(m_workers.size() > 0);
std::future<R> result = task.get_future();
{
std::unique_lock<std::mutex> lock(m_queue_mutex);
m_task_queue.emplace_back(std::move(task));
}
m_queue_wakeup.notify_one();
return result;
}
m_task_queue
一个结构体std::deque
在哪里abstract_packaged_task
。m_queue_wakeup
是std::condition_variable
唤醒一个工作线程来接任务。工作线程的实现很简单:
void ThreadPool::worker_impl()
{
std::unique_lock<std::mutex> lock(m_queue_mutex, std::defer_lock);
while (!m_terminated) {
lock.lock();
while (m_task_queue.empty()) {
m_queue_wakeup.wait(lock);
if (m_terminated) {
return;
}
}
abstract_packaged_task task(std::move(m_task_queue.front()));
m_task_queue.pop_front();
lock.unlock();
task();
}
}
您可以在我的 github 上查看完整的源代码和相应的标头。