2

有谁知道 C++ 线程池实现,它既允许并行线程(如典型的线程池),又允许背靠背串行执行顺序。我花了几天时间试图通过修改以下线程池来完成这项工作,但我似乎无法让它工作。我研究了英特尔 TBB 使用的技术,并且我研究了可能使用微软 PPL 中的概念(它的异步代理库看起来很有希望)——两者都有面向任务的技术来实现上述目标——但是不幸的是,这些解决方案将不工作我的目标 PowerPC linux 嵌入式目标。

编辑我将一个 live coliru 演示与生成线程图的源放在一起 - 并且还展示了一个很好的 scheduler_loop 示例,理论上可以等待线程完成。该代码还显示了一个带有 2 个线程的 UtlThreadPool,我在其中将并发任务提供给它 - 但是“提供”并不完全正确,需要一些工作来遍历节点。

我用来制作执行图的数据结构如下所示。它使用PriorityNode 数据结构。这个结构本质上是一个PriorityNodes的链表,每一个都包含一个PriorityLevel的向量可以并发运行的任务和指向下一个 PriorityNode 的指针,该指针指示以后要串行运行的线程。一旦这些都完成了,如果 mNextNode 成员不是 nullptr,那么这应该安排在线程池中运行(依此类推,直到 mNextNode 为 nullptr。通过这个 PriorityNodes 链表排序是我想要的线程池以通过其线程进行排序。PriorityNode 具有插入运算符,通常产生如下输出。(这意味着 1A1 可以与 1A2 并发运行,并且当这两个线程都完成时,下一个 PriorityNode 将允许 1B1、1B2、1B3 和1B4 并发运行 - 在池可用的线程数上。

1A1
1A2
+-1B1
+-1B2
+-1B3
+-1B4

我似乎最接近这个问题的解决方案 - 再次注意它是英特尔特定的,我在电源 PC 上是英特尔 TBB -是他们用于串行执行顺序的示例。

/**
 * Branch representing fundamental building block of
 * a priority tree containing szPriority entries.<p>
 *
 * Each priority tree struct contains a vector of concurrent
 * priorities that can be scheduled to run in the thread pool -
 * note that the thread pool must have no entries associated
 * with the current channel running before enqueueing these
 * tasks. The application must wait for the thread pool to
 * complete these tasks before queuing up the dependent tasks
 * described in the mNextNode smart pointer. If mNextNode is
 * unassigned (nullptr), then we have reached the end of the
 * tree.
 */
struct PriorityNode {
    explicit PriorityNode(
        const std::vector<PriorityLevel>& rConcurrent,
        const std::shared_ptr<PriorityNode>& rNext = std::shared_ptr<PriorityNode>(),
        const size_t& rDepth = 0)
        : mConcurrent(rConcurrent)
        , mNextNode(rNext)
        , mDepth(rDepth)
    {}

    /**
    * Stream insert operator<p>
    *
    * @param os     [in,out] output stream
    * @param rhs    [in] PriorityLevel to send to the output
    *               stream.
    *
    * @return a reference to the updated stream
    */
    inline friend std::ostream& operator << (
        std::ostream& os, const PriorityNode& rhs) {
        // indent 2 spaces per depth level
        std::string indent = rhs.mDepth > 0 ?
            (std::string("+") +
            std::string((rhs.mDepth * 2) - 1, '-')) :
            std::string();
        // print out the concurrent threads that 
        // can be scheduled with the thread pool
        for (const auto& next : rhs.mConcurrent) {
            os << indent << next << std::endl;
        }
        // print the dependent priorities that can only
        // be scheduled when the concurrent ones are finished
        if (rhs.mNextNode) {
            os << *rhs.mNextNode << std::endl;
        }
        return os;
    }
    // these are all equivalent thread priorities
    // that can be run simultaneously
    std::vector<PriorityLevel> mConcurrent;

    // these are concurrent threads that must be AFTER all 
    // mConcurrent tasks have completed (exiting the thread pool)
    std::shared_ptr<PriorityNode> mNextNode;

    // recursion depth
    size_t mDepth;
};
4

2 回答 2

2

为什么不在 PowerPC 上使用 TBB?它是一个高度可移植的库,旨在尽可能地跨平台;我听说它正在被 TBB 开源社区移植到 BlueGen 上。您可以在英特尔 TBB 论坛上询问他们,例如,通过恢复此论坛主题

英特尔不为 TBB 分发 PowerPC 二进制文件,但您可以尝试从源代码构建它,只需通过

做tbb

另请参阅这些社区补丁

于 2014-09-26T19:54:48.023 回答
0

如果有人还在寻找这个,请在​​此处查看 repo - https://github.com/hirak99/ordered_thread_pool

基本上有了这个你可以替换这样的代码 -

while (...) {
  std::cout << CostlyFn(input)) << std::endl;
}

进入这个——

OrderedThredPool<std::string> pool{10, 1};
while (...) {
  pool.Do(
    [&input] { return CostlyFn(input); },
    [](const std::string& out) {
      std::cout << out << std::endl;
    });
}

它会按顺序执行。

于 2020-08-29T23:05:51.240 回答