2

我正在尝试构建一个通用任务系统,我可以在其中发布在任何空闲线程上执行的任务。在之前的尝试中,我经常用完线程,因为它们会在某个时候阻塞。所以我正在尝试增强纤维;当一根光纤阻塞时,线程可以自由地在其他光纤上工作,听起来很完美。

工作窃取算法似乎非常适合我的目的,但我很难使用它。在示例代码中,创建了纤程,然后才创建线程和调度程序,因此所有纤程实际上都在所有线程上执行。但是我想稍后启动纤程,然后所有其他线程都被无限期地挂起,因为它们没有任何工作。我还没有找到任何方法来再次唤醒它们,我所有的纤维都只在主线程上执行。“通知”似乎是要调用的方法,但我看不到任何实际获取算法实例的方法。

我尝试保留指向算法所有实例的指针,以便调用 notify(),但这并没有真正帮助;大多数时候,工作线程中的算法不能从主线程中窃取任何东西,因为下一个是 dispatcher_context。

我可以禁用“挂起”,但线程正忙于等待,而不是一个选项。

我还尝试了 shared_work 算法。同样的问题,一旦线程找不到纤程,它就永远不会再次唤醒。我尝试了手动调用 notify() 的相同技巧,结果相同,非常不可靠。

我尝试使用通道,但是 AFAICT,如果光纤正在等待它,则当前上下文只是“跳跃”并运行等待的光纤,暂停当前的光纤。

简而言之:我发现在另一个线程上可靠地运行光纤非常困难。在分析时,大多数线程只是在等待一个 condition_variable,即使我确实创建了大量的纤维。

作为一个小测试用例,我正在尝试:

std::vector<boost::fibers::future<int>> v;

for (auto i = 0; i < 16; ++i)
    v.emplace_back(boost::fibers::async([i] {
       std::this_thread::sleep_for(std::chrono::milliseconds(1000));
       return i;
    }));

int s = 0;
for (auto &f : v)
    s += f.get();

我故意使用 this_thread::sleep_for 来模拟 CPU 繁忙。

对于 16 个线程,我希望这段代码能在 1 秒内运行,但大多数情况下它最终会是 16 秒。我能够让这个特定的示例在 1 秒内实际运行,只是在周围乱搞;但没有办法感觉“正确”,也没有办法适用于其他场景,它总是必须针对一个特定场景手工制作。

我认为这个例子应该可以像预期的那样使用 work_stealing 算法;我错过了什么?仅仅是滥用纤维吗?我怎样才能可靠地实现这一点?

谢谢,迪克斯

4

1 回答 1

1

boost.fiber 包含一个使用 work_stealing 算法的示例 (examples/work_stealing.cpp)。

  1. 您必须在应该处理/窃取纤维的每个工作线程上安装算法。 boost::fibers::use_scheduling_algorithm< boost::fibers::algo::work_stealing >( 4); // 4 worker-threads

  2. 在处理任务/纤程之前,您必须等到所有工作线程都已在算法中注册。该示例为此目的使用了屏障。

  3. 您需要知道所有工作/任务已被处理,例如使用条件变量。

看看Running with worker threads(boost 文档)。

于 2017-12-13T14:40:06.817 回答