有谁知道 C++ 线程池实现,它既允许并行线程(如典型的线程池),又允许背靠背串行执行顺序。我花了几天时间试图通过修改以下线程池来完成这项工作,但我似乎无法让它工作。我研究了英特尔 TBB 使用的技术,并且我研究了可能使用微软 PPL 中的概念(它的异步代理库看起来很有希望)——两者都有面向任务的技术来实现上述目标——但是不幸的是,这些解决方案将不工作我的目标 PowerPC linux 嵌入式目标。
编辑我将一个 live coliru 演示与生成线程图的源放在一起 - 并且还展示了一个很好的 scheduler_loop 示例,理论上可以等待线程完成。该代码还显示了一个带有 2 个线程的 UtlThreadPool,我在其中将并发任务提供给它 - 但是“提供”并不完全正确,需要一些工作来遍历节点。
我用来制作执行图的数据结构如下所示。它使用PriorityNode 数据结构。这个结构本质上是一个PriorityNodes的链表,每一个都包含一个PriorityLevel的向量可以并发运行的任务和指向下一个 PriorityNode 的指针,该指针指示以后要串行运行的线程。一旦这些都完成了,如果 mNextNode 成员不是 nullptr,那么这应该安排在线程池中运行(依此类推,直到 mNextNode 为 nullptr。通过这个 PriorityNodes 链表排序是我想要的线程池以通过其线程进行排序。PriorityNode 具有插入运算符,通常产生如下输出。(这意味着 1A1 可以与 1A2 并发运行,并且当这两个线程都完成时,下一个 PriorityNode 将允许 1B1、1B2、1B3 和1B4 并发运行 - 在池可用的线程数上。
1A1
1A2
+-1B1
+-1B2
+-1B3
+-1B4
我似乎最接近这个问题的解决方案 - 再次注意它是英特尔特定的,我在电源 PC 上是英特尔 TBB -这是他们用于串行执行顺序的示例。
/**
* Branch representing fundamental building block of
* a priority tree containing szPriority entries.<p>
*
* Each priority tree struct contains a vector of concurrent
* priorities that can be scheduled to run in the thread pool -
* note that the thread pool must have no entries associated
* with the current channel running before enqueueing these
* tasks. The application must wait for the thread pool to
* complete these tasks before queuing up the dependent tasks
* described in the mNextNode smart pointer. If mNextNode is
* unassigned (nullptr), then we have reached the end of the
* tree.
*/
struct PriorityNode {
explicit PriorityNode(
const std::vector<PriorityLevel>& rConcurrent,
const std::shared_ptr<PriorityNode>& rNext = std::shared_ptr<PriorityNode>(),
const size_t& rDepth = 0)
: mConcurrent(rConcurrent)
, mNextNode(rNext)
, mDepth(rDepth)
{}
/**
* Stream insert operator<p>
*
* @param os [in,out] output stream
* @param rhs [in] PriorityLevel to send to the output
* stream.
*
* @return a reference to the updated stream
*/
inline friend std::ostream& operator << (
std::ostream& os, const PriorityNode& rhs) {
// indent 2 spaces per depth level
std::string indent = rhs.mDepth > 0 ?
(std::string("+") +
std::string((rhs.mDepth * 2) - 1, '-')) :
std::string();
// print out the concurrent threads that
// can be scheduled with the thread pool
for (const auto& next : rhs.mConcurrent) {
os << indent << next << std::endl;
}
// print the dependent priorities that can only
// be scheduled when the concurrent ones are finished
if (rhs.mNextNode) {
os << *rhs.mNextNode << std::endl;
}
return os;
}
// these are all equivalent thread priorities
// that can be run simultaneously
std::vector<PriorityLevel> mConcurrent;
// these are concurrent threads that must be AFTER all
// mConcurrent tasks have completed (exiting the thread pool)
std::shared_ptr<PriorityNode> mNextNode;
// recursion depth
size_t mDepth;
};