2

我已经有了Worker类和一个Handler类来为工作创建一个抽象层。我想用std::async一些异步来混合,但我的 Visual Studio 2012(更新 1)出现了一些奇怪的行为。

我的类层次结构如下:

  • Worker是一个具有纯虚方法的抽象类InitWork
  • BasicWorker : Worker只是printf用于一些输出。
  • GroupWorker : Worker是其他工作人员的聚合。
  • Handler坚持Worker做一些工作。

然后我调用几个std::async方法,在其中创建工作程序和处理程序,在嵌套std::async调用中调用处理程序,等待工作程序的初始化(std::condition_variable此处),然后停止处理程序。

最后,我等待所有的std::futures 完成。

代码如下:

#include <stdio.h>
#include <future>
#include <array>
#include <atomic>
#include <vector>

struct Worker
{
    virtual ~Worker() { }
    virtual void Init() = 0;
    virtual void Work() = 0;
};

struct BasicWorker : public Worker
{
    virtual ~BasicWorker() { }
    virtual void Init()
    {
        printf("\t\t\t\tInit: %d\n", std::this_thread::get_id());
    }

    virtual void Work()
    {
        printf("\t\t\t\tWork: %d\n", std::this_thread::get_id());
    }
};

struct GroupWorker : public Worker
{
    GroupWorker()
    {
        workers.push_back(std::make_shared<BasicWorker>());
    }

    virtual ~GroupWorker() { }

    virtual void Init()
    {
        for(int i = 0; i < workers.size(); ++i)
        {
            workers[i]->Init();
        }
        initEvent.notify_all();
    }

    virtual void Work()
    {
        for(int i = 0; i < workers.size(); ++i)
        {
            workers[i]->Work();
        }
    }

    void WaitForInit()
    {
        //std::unique_lock<std::mutex> initLock(initMutex);
        //initEvent.wait(initLock);
    }
private:
    std::mutex initMutex;
    std::condition_variable initEvent;
    std::vector<std::shared_ptr<Worker>> workers;
};

struct Handler
{
    static const int Stopped = -1;
    static const int Ready = 0;
    static const int Running = 1;

    Handler(const std::shared_ptr<Worker>& worker) :
        worker(worker)
    { }

    void Start(int count)
    {
        int readyValue = Ready;
        if(working.compare_exchange_strong(readyValue, Running))
        {
            worker->Init();

            for(int i = 0; i < count && working == Running; ++i)
            {
                worker->Work();
            }
        }
    }

    void Stop()
    {
        working = Stopped;
    }
private:
    std::atomic<int> working;
    std::shared_ptr<Worker> worker;
};

std::future<void> Start(int jobIndex, int runCount)
{
    //printf("Start: %d\n", jobIndex);
    return std::async(std::launch::async, [=]()
    {
        printf("Async: %d\n", jobIndex);
        auto worker = std::make_shared<GroupWorker>();
        auto handler = std::make_shared<Handler>(worker);

        auto result = std::async(std::launch:async, [=]()
        {
            printf("Nested async: %d\n", jobIndex);
            handler->Start(runCount);
        });

        worker->WaitForInit();
        handler->Stop();

        result.get();
    });
}

int main()
{
    const int JobCount = 300;
    const int RunCount =  5;
    std::array<std::future<void>, JobCount> jobs;

    for(int i = 0; i < JobCount; ++i)
    {
        jobs[i] = Start(i, RunCount);
    }

    for(int i = 0; i < JobCount; ++i)
    {
        jobs[i].get();
    }
}

我的问题是:

  • 如果我取消注释函数中的行,WaitForInit@GroupWorker那么在进行所有第一级异步函数调用之前,不会进行嵌套的异步函数调用
  • 在等待std::condition_variable我增加作业数量时,新线程的创建感觉就像成倍地变慢。对于我的试验,低于 100 个工作存在一些异步,但超过 300 个则完全按顺序创建工作。
  • 然后,如果我取消注释方法中的printfStart,所有嵌套的异步都会像魅力一样工作

所以,

  • 我在使用中做错了std::condition_variable什么?
  • 为什么为 100 多个线程创建作业会变慢?(这个问题是可选的,似乎是操作系统的问题,可以用智能线程池概念来解决)
  • printf和这些有什么关系?(我尝试printf在竞争条件下删除所有调用,我在代码中放置了一个断点但没有帮助。情况std::cout也是如此)

编辑:我添加了启动策略(由 Jonathan Wakely 建议)以确保创建线程。但这也无济于事。我目前正在创建一个std::thread并调用thread::join函数以在第一级异步中等待。

4

1 回答 1

1

NB 可以调用printf,但不能假设std::thread::id可以转换为int. 您可以像这样使它更便携:

inline long tol(std::thread::id id)
{
  std::ostringstream ss;
  ss << id;
  return stol(ss.str());
}

(这仍然假设 的字符串值std::thread::id可以转换为long,这不是必需的,但比假设隐式转换为 更有可能int

我在使用 std::condition_variable 时做错了什么?

您没有正在等待的“条件”,也没有同步来确保在notify_all调用wait. 你应该有一个成员变量,上面写着“这个工人已经被初始化”,它是由 设置的Init,并且只有在条件变量不为真时才等待条件变量(该标志应该是原子的或由互斥锁保护,以防止数据竞争) .

为什么为 100 多个线程创建作业会变慢?(这个问题是可选的,似乎是操作系统的问题,可以用智能线程池概念来解决)

因为有数百个线程有很多共享资源的争用和 OS 调度程序的很大压力,所以实现可能决定开始返回延迟函数(即好像std::async被调用std::launch::deferred)而不是异步函数。您的代码假定async不会返回延迟函数,因为如果异步工作者及其嵌套的异步工作者都作为延迟函数运行,则程序可能会死锁,因为外部函数会阻塞等待嵌套的调用Init,但嵌套函数永远不会运行,直到出一通电话result.get()。您的程序不可移植,只能在 Windows 上运行,因为(如果我理解正确的话)MSVCasync使用工作窃取线程池,如果线程可用,它将运行延迟函数。这不是标准要求的。 如果您想强制每个工作人员拥有一个新线程,请使用该std::launch::async策略。

printf 与这些有什么关系?(我尝试在竞争条件下删除所有 printf 调用,我在代码中放置了一个断点但没有帮助。std::cout 也是如此)

它会带来轻微的延迟,并且可能会在线程之间产生某种形式的不可靠排序,因为它们现在正在竞争并且可能在竞争单个全局资源。施加的延迟printf可能足以让一个线程完成,这会将其资源释放到线程池并允许另一个异步工作者运行。

于 2012-12-11T00:45:17.147 回答