8

我目前正在研究一个模拟扩展 Producer-Worker 模型的问题。在这个问题中,有 3 名工人和 3 种工具可用,而工人要工作,他们需要 2 种工具(和材料,但这些无关紧要)。如果保险库中有 >=2 个工具,工人将拿 2 个。否则,他们将等待一个条件变量,当有 >=2 时将发出信号。

这对 2 名工人来说很好:一名将工作然后将工具归还到保险库,另一名等待的工人将被唤醒并拿走 2 件工具。问题是,有 3 名工人,总会有一个人饿着要得到工具。

经过一些测试,我注意到等待条件变量的线程是以堆栈形式构造的。有没有可能让它排队?(1等,2等,3等。1醒了要再造,要在2和3后面等。)

这是一个示例输出。代码太长了,如果真的需要,我会发布它。有 3 个工作线程和 1 个工具互斥锁。挨饿的人每跑一次都不一样。

1 Tools taken. Remaining: 1
2 Waiting on tools...
3 Waiting on tools...
1 Operator Product made. Tools returned. Tools now:3
3 Tools taken. Remaining: 1
1 Waiting on tools...
3 Materials returned for switch.
3 Operator Product made. Tools returned. Tools now:3
1 Tools taken. Remaining: 1
3 Waiting on tools...
1 Materials returned for switch.
1 Operator Product made. Tools returned. Tools now:3
3 Tools taken. Remaining: 1
1 Waiting on tools...
3 Materials returned for switch.
3 Operator Product made. Tools returned. Tools now:3
1 Tools taken. Remaining: 1
3 Waiting on tools...
1 Materials returned for switch.
1 Operator Product made. Tools returned. Tools now:3
3 Tools taken. Remaining: 1
1 Waiting on tools...
3 Materials returned for switch.
3 Operator Product made. Tools returned. Tools now:3
1 Tools taken. Remaining: 1
3 Waiting on tools...
1 Materials returned for switch.
...

(如您所见,2 从未获得工具...)

更新:2013/07/05 我添加了一些代码。

int tools = 3; //global
string last; //current last product on output buffer
mutex toolsMutex;
mutex matSearchMutex;

int main(){
//Initializing Producers
    Producer prod1(1);
    Producer prod2(2);
        Producer prod3(3);



    thread p1(processor,1);
    thread p2(processor,2);
    thread p3(processor,3);

    p1.detach();
    p2.detach();
    p3.detach();

    while(true){//forever running

    }

    return 0;
}

处理器:

  //Processor method
void processor(int i){
    srand(time(NULL)); 

    while (true){ //forever running


    bool hasTools = false;
    bool productMade = false;
    while (productMade == false){ //while product has yet to be made.
        //choose what to make...



        if (hasTools == false){
            thread matT(getMaterials,whatToMake);
            thread toolT(getTools,i);
            toolT.join();           
            matT.join();
            hasTools = true;
        }
        else{ //tools acquired but no materials
            thread matT(getMaterials,whatToMake);
            matT.join();
        }

        if (recordedLast.compare(last) != 0){

            //return materials and acquire new ones the next run

            continue;
        }
        else {
            makeProduct(whatToMake);
            unique_lock<mutex> locker(toolMutex); 
            tools = tools + 2;
            cout << i << " Operator Product made. Tools returned. Tools now:" << tools << endl;
            productMade = true;
            if (tools >=2)  toolsCV.notify_one();
        }

    //done processing

    }


}   

}

制作产品:

void makeProduct(int i){
    unique_lock<mutex> mainMatLock(matSearchMutex); 
    // make product according to i
    this_thread::sleep_for(chrono::milliseconds(rand() % 1000 + 10));   
}

获取工具:

void getTools(int i){
    unique_lock<mutex> locker(toolMutex); 
    if (tools <2){
        cout << i << " Waiting on tools..." << endl;
        toolsCV.wait(locker);}
    tools = tools - 2;//tools acquired
    cout << i <<" Tools taken. Remaining: " << tools << endl;

}

感谢那些已经回复的人。今晚我将尝试使用多个条件变量来实现等待队列。

(PS 有没有更好的方法在 Stack Overflow 上进行代码格式化?除了四个空格...

4

4 回答 4

13

std::condition_variable没有指定调用时唤醒哪个等待线程notify_one。因此,您应该编写不关心唤醒哪个线程的代码。标准模式是,无论哪个线程被唤醒,该线程都应该完成需要完成的工作。

如果您要求以特定顺序唤醒线程,则使用不同的机制。例如,您可以std::condition_variable为每个线程设置一个单独的线程,然后在线程需要工具时将它们放入队列中。当一个线程提交工具时,它可以向队列前面的线程对应的条件变量发出信号。然后该线程将被唤醒,而其他线程将保持睡眠状态(模数虚假唤醒)。

于 2013-07-02T10:17:44.280 回答
3

当有多个线程等待一个条件时,它们被唤醒的顺序(notify_all)或唤醒的顺序(notify_one)是未指定的。如果你需要某种排序,你需要使用notify_all,并自己实现它。您可以保留一个等待线程的队列:在等待之前(但在获取互斥体之后),将线程 id 推送到队列的末尾。在循环中,循环“队列前面的这个线程和可用的必要工具”。拿到工具后,把队列前面的id去掉,然后 notify_all再调用。

于 2013-07-02T08:28:53.560 回答
1

一种方法可能是使用在线程之间共享的完全成熟的信号量而不是条件变量。这样,您可以等待特定的计数。

#include <mutex>
#include <thread>
#include <condition_variable>

using std::mutex;
using std::condition_variable;

class Semaphore
{
public:
    /**
     * Construct a counting semaphore with an initial value
     * @param cnt The value of the initial semaphore count
     */
    Semaphore(unsigned int cnt);

    /**
     * acquire a semaphore count
     * @param numRes The number of count ressources to acquire
     */
    void acquire(unsigned int numRes = 1);

    /**
     * try to acquire a semaphore count.
     * @param numRes The number of count ressources that the method tries to acquire
     * @return true, if numRes could be aquired
     *         false, otherwise
     */
    bool tryAcquire(unsigned int numRes = 1);

    /**
     * release one semaphore cnt
     * @param numRes The number of count ressources to release
     */
    void release(unsigned int numRes = 1);

private:
    unsigned int cnt;
    mutex mut;
    condition_variable cond;
};

实现看起来像:

void Semaphore::acquire(unsigned int numRes)
{
    unique_lock<mutex> lock(mut);
    while (cnt < numRes)
    {
        cond.wait(lock);
    }

    cnt-=numRes;
}

bool Semaphore::tryAcquire(unsigned int numRes)
{
    unique_lock<mutex> lock(mut);
    if (cnt>=numRes)
    {
        cnt -= numRes;
        return true;
    }
    return false;
}

void Semaphore::release(unsigned int numRes)
{
    {
        unique_lock<mutex> lock(mut);
        cnt += numRes;
    }
    // notify <numRes> waiting entities
    for (unsigned int i = 0; i<numRes; ++i)
    {
        cond.notify_one(); 
    }
}
于 2013-07-02T09:41:22.780 回答
1

这里真正的问题是,如果您有工作线程和有限数量的所需资源,您不应该关心实际激活了哪些线程,您应该只关心工作是否完成。这里唯一的区别在于日志记录。您定义的线程数是可以并行运行的线程数,受资源限制为1。

如果这对您来说不合适,那么您需要按照其他答案中的说明自行采取行动。

于 2013-07-02T10:01:26.940 回答