6

我想创建一个用于实验目的的线程池(以及有趣的因素)。它应该能够处理各种各样的任务(所以我可以在以后的项目中使用它)。


在我的线程池类中,我将需要某种任务队列。由于标准库std::packaged_task从 C++11 标准开始提供,我的队列看起来像std::deque<std::packaged_task<?()> > task_queue,因此客户端可以std::packaged_task通过某种公共接口函数将 s 推送到队列中(然后池中的一个线程将收到条件变量来执行它等)。


我的问题与std::packaged_task<?()>双端队列中 s 的模板参数有关。

函数签名?()应该能够处理任何类型/数量的参数,因为客户端可以执行以下操作:

std::packaged_task<int()> t(std::bind(factorial, 342)); thread_pool.add_task(t);

所以我不必处理参数的类型/数量。

但是返回值应该是什么?(因此是问号)

  • 如果我将整个线程池类设为模板类,它的一个实例将只能处理具有特定签名(如std::packaged_task<int()>)的任务。

    我希望一个线程池对象能够处理任何类型的任务。

  • 如果我继续使用std::packaged_task<void()>并且调用的函数返回一个整数或任何东西,那么这就是未定义的行为。

4

2 回答 2

9

所以困难的部分是packaged_task<R()>只能移动,否则你可以把它扔进 a std::function<void()>,然后在你的线程中运行它们。

有几种方法可以解决这个问题。

首先,可笑的是,使用 apackaged_task<void()>来存储 a packaged_task<R()>。我建议不要这样做,但它确实有效。;) (operator()on的签名是packaged_task<R()>什么?传递给的对象所需的签名是什么packaged_task<void()>?)

其次,将您的包装packaged_task<R()>在 a 中shared_ptr,将其捕获在带有签名的 lambda 中void(),将其存储在 a 中std::function<void()>,然后完成。这有间接成本,但可能低于第一个解决方案。

最后,编写您自己的仅移动函数包装器。对于签名void(),它很短:

struct task {
  template<class F,
    class dF=std::decay_t<F>,
    class=decltype( std::declval<dF&>()() )
  >
  task( F&& f ):
    ptr(
      new dF(std::forward<F>(f)),
      [](void* ptr){ delete static_cast<dF*>(ptr); }
    ),
    invoke([](void*ptr){
      (*static_cast<dF*>(ptr))();
    })
  {}
  void operator()()const{
    invoke( ptr.get() );
  }
  task(task&&)=default;
  task&operator=(task&&)=default;
  task()=default;
  ~task()=default;
  explicit operator bool()const{return static_cast<bool>(ptr);}
private:
  std::unique_ptr<void, void(*)(void*)> ptr;
  void(*invoke)(void*) = nullptr;
};

和简单。以上可以存储packaged_task<R()>任何类型R,并在以后调用它们。

这具有相对最小的开销——它应该比std::function,至少我见过的实现便宜——除了它不做 SBO(小缓冲区优化),它在内部而不是堆上存储小函数对象。

unique_ptr<> ptr如果需要,您可以通过小的缓冲区优化来改进容器。

于 2015-06-26T16:39:55.713 回答
4

我碰巧有一个可以做到这一点的实现。我的做事方式是将std::packaged_task对象包装在一个抽象出返回类型的结构中。将任务提交到线程池的方法返回结果的未来。

这种工作,但由于每个任务所需的内存分配,它不适合非常短且非常频繁的任务(我试图用它来并行化流体模拟的块并且开销太高了,在324 个任务的几毫秒数量级)。

关键部分是这个结构:

struct abstract_packaged_task
{
    template <typename R>
    abstract_packaged_task(std::packaged_task<R> &&task):
        m_task((void*)(new std::packaged_task<R>(std::move(task)))),
        m_call_exec([](abstract_packaged_task *instance)mutable{
            (*(std::packaged_task<R>*)instance->m_task)();
        }),
        m_call_delete([](abstract_packaged_task *instance)mutable{
            delete (std::packaged_task<R>*)(instance->m_task);
        })
    {

    }

    abstract_packaged_task(abstract_packaged_task &&other);

    ~abstract_packaged_task();

    void operator()();

    void *m_task;
    std::function<void(abstract_packaged_task*)> m_call_exec;
    std::function<void(abstract_packaged_task*)> m_call_delete;
};

如您所见,它通过使用带有std::function和 a 的lambdas 隐藏了类型依赖关系void*。如果您知道所有可能出现的std::packaged_task对象的最大大小(我还没有检查大小是否完全依赖R),您可以尝试通过删除内存分配来进一步优化它。

进入线程池的提交方法然后执行此操作:

template <typename R>
std::future<R> submit_task(std::packaged_task<R()> &&task)
{
    assert(m_workers.size() > 0);
    std::future<R> result = task.get_future();
    {
        std::unique_lock<std::mutex> lock(m_queue_mutex);
        m_task_queue.emplace_back(std::move(task));
    }
    m_queue_wakeup.notify_one();
    return result;
}

m_task_queue一个结构体std::deque在哪里abstract_packaged_taskm_queue_wakeupstd::condition_variable唤醒一个工作线程来接任务。工作线程的实现很简单:

void ThreadPool::worker_impl()
{
    std::unique_lock<std::mutex> lock(m_queue_mutex, std::defer_lock);
    while (!m_terminated) {
        lock.lock();
        while (m_task_queue.empty()) {
            m_queue_wakeup.wait(lock);
            if (m_terminated) {
                return;
            }
        }
        abstract_packaged_task task(std::move(m_task_queue.front()));
        m_task_queue.pop_front();
        lock.unlock();

        task();
    }
}

您可以在我的 github 上查看完整的源代码相应的标头。

于 2015-06-26T11:51:46.933 回答