174

相关问题

关于 C++11:

关于升压:


如何获得一个线程池将任务发送到,而无需一遍又一遍地创建和删除它们?这意味着持久线程无需加入即可重新同步。


我的代码如下所示:

namespace {
  std::vector<std::thread> workers;

  int total = 4;
  int arr[4] = {0};

  void each_thread_does(int i) {
    arr[i] += 2;
  }
}

int main(int argc, char *argv[]) {
  for (int i = 0; i < 8; ++i) { // for 8 iterations,
    for (int j = 0; j < 4; ++j) {
      workers.push_back(std::thread(each_thread_does, j));
    }
    for (std::thread &t: workers) {
      if (t.joinable()) {
        t.join();
      }
    }
    arr[4] = std::min_element(arr, arr+4);
  }
  return 0;
}

与其在每次迭代中创建和加入线程,我更愿意在每次迭代时将任务发送到我的工作线程并且只创建一次。

4

9 回答 9

144

这是从我对另一个非常相似的帖子的回答中复制的:

  1. 从系统可以支持的最大线程数开始:

    int num_threads = std::thread::hardware_concurrency();
    
  2. 对于一个高效的线程池实现,一旦根据 线程创建num_threads,最好不要创建新线程,或者销毁旧线程(通过加入)。会有性能损失,甚至可能使您的应用程序运行速度比串行版本慢。

每个 C++11 线程都应该在其函数中以无限循环运行,不断等待新任务的抓取和运行。

以下是如何将这样的函数附加到线程池:

int num_threads = std::thread::hardware_concurrency();
std::vector<std::thread> threads;
for (int i = 0; i < num_threads; i++)
{
    pool.push_back(std::thread(Infinite_loop_function));
}
  1. 无限循环功能。这是一个while (true)等待任务队列的循环。
void Pool::Infinite_loop_function()
{
    while (true)
    {
        {
            std::unique_lock<std::mutex> lock(queue_mutex);

            condition.wait(lock, [this](){
                return !queue.empty() || terminate_pool;
            });
            Job = queue.front();
            queue.pop();
        }

        Job(); // function<void()> type
    }
};
  1. 制作一个将作业添加到队列的功能
void Pool::Add_Job(function<void()> New_Job)
{
    {
        std::unique_lock<std::mutex> lock(queue_mutex);
        queue.push(New_Job);
    }

    condition.notify_one();
}
  1. 将任意函数绑定到您的队列
Pool_Obj.Add_Job(std::bind(&Some_Class::Some_Method, &Some_object));

一旦你整合了这些成分,你就有了自己的动态线程池。这些线程始终运行,等待工作完成。

如果有一些语法错误,我深表歉意,我输入了这段代码,我记性不好。抱歉,我无法为您提供完整的线程池代码;那会违反我的职业操守。

编辑:终止池,调用shutdown()方法:

Pool::shutdown()
{
    {
        std::unique_lock<std::mutex> lock(threadpool_mutex);
        terminate_pool = true; // use this flag in condition.wait
    }

    condition.notify_all(); // wake up all threads.

    // Join all threads.
    for (std::thread &th : threads)
    {
        th.join();
    }

    pool.clear();  
    stopped = true; // use this flag in destructor, if not set, call shutdown() 
}

注意:使用匿名代码块,以便当它们退出时,std::unique_lock在其中创建的变量会超出范围,从而解锁互斥锁。

于 2015-09-15T19:12:14.177 回答
100

您可以使用 C++ 线程池库,https://github.com/vit-vit/ctpl

然后您编写的代码可以替换为以下内容

#include <ctpl.h>  // or <ctpl_stl.h> if ou do not have Boost library

int main (int argc, char *argv[]) {
    ctpl::thread_pool p(2 /* two threads in the pool */);
    int arr[4] = {0};
    std::vector<std::future<void>> results(4);
    for (int i = 0; i < 8; ++i) { // for 8 iterations,
        for (int j = 0; j < 4; ++j) {
            results[j] = p.push([&arr, j](int){ arr[j] +=2; });
        }
        for (int j = 0; j < 4; ++j) {
            results[j].get();
        }
        arr[4] = std::min_element(arr, arr + 4);
    }
}

您将获得所需数量的线程,并且不会在迭代中一遍又一遍地创建和删除它们。

于 2014-09-05T11:20:57.370 回答
72

线程池意味着您的所有线程一直在运行——换句话说,线程函数永远不会返回。为了给线程做一些有意义的事情,你必须设计一个线程间通信系统,既是为了告诉线程有事要做,也是为了传达实际的工作数据。

通常这将涉及某种并发数据结构,并且每个线程可能会在某种条件变量上休眠,当有工作要做时会通知它。收到通知后,一个或多个线程唤醒,从并发数据结构中恢复任务,处理它,并以类似的方式存储结果。

然后该线程将继续检查是否还有更多工作要做,如果没有则继续睡眠。

结果是您必须自己设计所有这些,因为没有普遍适用的“工作”的自然概念。这是一项相当多的工作,并且您必须解决一些微妙的问题。(如果你喜欢一个在幕后为你处理线程管理的系统,你可以在 Go 中编程。)

于 2013-04-01T22:27:10.370 回答
21

线程池的核心是一组线程,所有线程都绑定到作为事件循环工作的函数。这些线程将无休止地等待一个任务被执行,或者它们自己的终止。

线程池作业是提供一个提交作业的接口,定义(也许还可以修改)运行这些作业的策略(调度规则、线程实例化、池大小),并监控线程和相关资源的状态。

因此,对于一个多功能池,必须首先定义一个任务是什么,它如何启动、中断、结果是什么(请参阅该问题的 promise 和 future 的概念)、线程必须响应的事件类型to,他们将如何处理它们,如何将这些事件与任务处理的事件区分开来。正如您所看到的,这可能会变得非常复杂,并且随着解决方案变得越来越多,会对线程的工作方式施加限制。

当前处理事件的工具相当简单(*):诸如互斥锁、条件变量之类的原语,以及在此之上的一些抽象(锁、屏障)。但在某些情况下,这些抽象可能会变得不合适(请参阅这个相关问题),必须恢复使用原语。

还必须管理其他问题:

  • 信号
  • 输入/输出
  • 硬件(处理器亲和性,异构设置)

这些将如何在您的环境中发挥作用?

这个对类似问题的回答指向一个用于 boost 和 stl 的现有实现。

我为另一个问题提供了一个非常粗略的线程池实现,它没有解决上面列出的许多问题。你可能想建立在它之上。您可能还想看看其他语言的现有框架,以寻找灵感。


(*) 我不认为这是一个问题,恰恰相反。我认为这是从 C 继承而来的 C++ 精神。

于 2013-04-01T23:33:43.080 回答
12
Follwoing [PhD EcE](https://stackoverflow.com/users/3818417/phd-ece) suggestion, I implemented the thread pool:

function_pool.h

#pragma once
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <cassert>

class Function_pool
{

private:
    std::queue<std::function<void()>> m_function_queue;
    std::mutex m_lock;
    std::condition_variable m_data_condition;
    std::atomic<bool> m_accept_functions;

public:

    Function_pool();
    ~Function_pool();
    void push(std::function<void()> func);
    void done();
    void infinite_loop_func();
};

函数池.cpp

#include "function_pool.h"

Function_pool::Function_pool() : m_function_queue(), m_lock(), m_data_condition(), m_accept_functions(true)
{
}

Function_pool::~Function_pool()
{
}

void Function_pool::push(std::function<void()> func)
{
    std::unique_lock<std::mutex> lock(m_lock);
    m_function_queue.push(func);
    // when we send the notification immediately, the consumer will try to get the lock , so unlock asap
    lock.unlock();
    m_data_condition.notify_one();
}

void Function_pool::done()
{
    std::unique_lock<std::mutex> lock(m_lock);
    m_accept_functions = false;
    lock.unlock();
    // when we send the notification immediately, the consumer will try to get the lock , so unlock asap
    m_data_condition.notify_all();
    //notify all waiting threads.
}

void Function_pool::infinite_loop_func()
{
    std::function<void()> func;
    while (true)
    {
        {
            std::unique_lock<std::mutex> lock(m_lock);
            m_data_condition.wait(lock, [this]() {return !m_function_queue.empty() || !m_accept_functions; });
            if (!m_accept_functions && m_function_queue.empty())
            {
                //lock will be release automatically.
                //finish the thread loop and let it join in the main thread.
                return;
            }
            func = m_function_queue.front();
            m_function_queue.pop();
            //release the lock
        }
        func();
    }
}

主文件

#include "function_pool.h"
#include <string>
#include <iostream>
#include <mutex>
#include <functional>
#include <thread>
#include <vector>

Function_pool func_pool;

class quit_worker_exception : public std::exception {};

void example_function()
{
    std::cout << "bla" << std::endl;
}

int main()
{
    std::cout << "stating operation" << std::endl;
    int num_threads = std::thread::hardware_concurrency();
    std::cout << "number of threads = " << num_threads << std::endl;
    std::vector<std::thread> thread_pool;
    for (int i = 0; i < num_threads; i++)
    {
        thread_pool.push_back(std::thread(&Function_pool::infinite_loop_func, &func_pool));
    }

    //here we should send our functions
    for (int i = 0; i < 50; i++)
    {
        func_pool.push(example_function);
    }
    func_pool.done();
    for (unsigned int i = 0; i < thread_pool.size(); i++)
    {
        thread_pool.at(i).join();
    }
}
于 2018-07-18T10:46:41.803 回答
10

您可以使用boost 库中的thread_pool

void my_task(){...}

int main(){
    int threadNumbers = thread::hardware_concurrency();
    boost::asio::thread_pool pool(threadNumbers);

    // Submit a function to the pool.
    boost::asio::post(pool, my_task);

    // Submit a lambda object to the pool.
    boost::asio::post(pool, []() {
      ...
    });
}

您还可以使用来自开源社区的线程池:

void first_task() {...}    
void second_task() {...}

int main(){
    int threadNumbers = thread::hardware_concurrency();
    pool tp(threadNumbers);

    // Add some tasks to the pool.
    tp.schedule(&first_task);
    tp.schedule(&second_task);
}
于 2019-06-20T08:41:33.580 回答
4

像这样的东西可能会有所帮助(取自工作应用程序)。

#include <memory>
#include <boost/asio.hpp>
#include <boost/thread.hpp>

struct thread_pool {
  typedef std::unique_ptr<boost::asio::io_service::work> asio_worker;

  thread_pool(int threads) :service(), service_worker(new asio_worker::element_type(service)) {
    for (int i = 0; i < threads; ++i) {
      auto worker = [this] { return service.run(); };
      grp.add_thread(new boost::thread(worker));
    }
  }

  template<class F>
  void enqueue(F f) {
    service.post(f);
  }

  ~thread_pool() {
    service_worker.reset();
    grp.join_all();
    service.stop();
  }

private:
  boost::asio::io_service service;
  asio_worker service_worker;
  boost::thread_group grp;
};

你可以像这样使用它:

thread_pool pool(2);

pool.enqueue([] {
  std::cout << "Hello from Task 1\n";
});

pool.enqueue([] {
  std::cout << "Hello from Task 2\n";
});

请记住,重新发明一种高效的异步排队机制并非易事。

Boost::asio::io_service 是一个非常有效的实现,或者实际上是特定于平台的包装器的集合(例如,它包装了Windows 上的I/O 完成端口)。

于 2016-01-27T13:08:18.210 回答
3

编辑:这现在需要 C++17 和概念。(截至 2016 年 9 月 12 日,只有 g++ 6.0+ 就足够了。)

但是,模板推导因此更加准确,因此值得努力获得更新的编译器。我还没有找到需要显式模板参数的函数。

它现在还接受任何适当的可调用对象(并且仍然是静态类型安全的!!!)。

它现在还包括一个使用相同 API 的可选绿色线程优先级线程池。不过,此类仅适用于 POSIX。它使用ucontext_tAPI 进行用户空间任务切换。


我为此创建了一个简单的库。下面给出一个使用示例。(我之所以回答这个问题,是因为这是我在决定有必要自己编写之前发现的事情之一。)

bool is_prime(int n){
  // Determine if n is prime.
}

int main(){
  thread_pool pool(8); // 8 threads

  list<future<bool>> results;
  for(int n = 2;n < 10000;n++){
    // Submit a job to the pool.
    results.emplace_back(pool.async(is_prime, n));
  }

  int n = 2;
  for(auto i = results.begin();i != results.end();i++, n++){
    // i is an iterator pointing to a future representing the result of is_prime(n)
    cout << n << " ";
    bool prime = i->get(); // Wait for the task is_prime(n) to finish and get the result.
    if(prime)
      cout << "is prime";
    else
      cout << "is not prime";
    cout << endl;
  }  
}

您可以传递async具有任何(或 void)返回值和任何(或没有)参数的任何函数,它将返回相应的std::future. 要获得结果(或只是等到任务完成),您需要调用get()未来。

这是 github:https ://github.com/Tyler-Hardin/thread_pool 。

于 2014-06-15T18:57:48.050 回答
0

看起来线程池是非常流行的问题/练习:-)

我最近用现代 C++ 写了一个;它归我所有,可在此处公开获取 - https://github.com/yurir-dev/threadpool

它支持模板化的返回值、核心固定、某些任务的排序。两个 .h 文件中的所有实现。

所以,原来的问题是这样的:

#include "tp/threadpool.h"

int arr[5] = { 0 };

concurency::threadPool<void> tp;
tp.start(std::thread::hardware_concurrency());

std::vector<std::future<void>> futures;
for (int i = 0; i < 8; ++i) { // for 8 iterations,
    for (int j = 0; j < 4; ++j) {
        futures.push_back(tp.push([&arr, j]() {
               arr[j] += 2;
            }));
    }
}

// wait until all pushed tasks are finished.
for (auto& f : futures)
    f.get();
// or just tp.end(); // will kill all the threads

arr[4] = *std::min_element(arr, arr + 4);
于 2022-01-14T23:13:19.363 回答