1

我的程序有三个线程,我正在尝试学习同步和线程安全。下面我概述了不同线程的作用,但我想学习如何使用事件来触发不同线程中的每个进程,而不是无限读取(这给了我并发问题)。

谷歌搜索引发了许多选择,但我不确定在这种情况下最好实现什么 - 你能否指出我可以学习最好地实现这一点的标准方法/事件的方向?

我在 VS 2012 上这样做,理想情况下我不会使用外部库,例如 boost。

线程 1:接收消息并将其推送到全局队列中,queue<my_class> msg_in.

线程2:无限循环(即while(1));等到if (!msg_in.empty()),进行一些处理,然后将其推送到全局map<map<queue<my_class>>> msg_out.

while (1)
{
    if (!msg_in.empty())
    {
        //processes 
        msg_map[i][j].push(); //i and j are int (irrelevant here)
    }

}

线程 3:

while (1)
{
    if (msg_map.find(i) != msg_map.end())
    {
        if (!msg_map[i].find(j)->second.empty())
        {
            //processes 
        }
    }
}
4

1 回答 1

0

您的问题是生产者消费者问题。您可以为您的事件使用条件变量。这里有一个例子:http: //en.cppreference.com/w/cpp/thread/condition_variable

如果您需要,我已将其改编为您的示例。

#include "MainThread.h"


#include <iostream>
#include <string>
#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>

std::mutex m;
std::condition_variable cv;
bool ready = false;
bool processed = false;

void worker_thread(unsigned int threadNum)
{
    // Wait until main() sends data
    {
        std::unique_lock<std::mutex> lk(m);
        cv.wait(lk, []{return ready;});
    }

    std::cout << "Worker thread "<<threadNum <<" is processing data"<<std::endl;

    // Send data back to main()
    {
        std::lock_guard<std::mutex> lk(m);
        processed = true;
        std::cout << "Worker thread "<< threadNum <<" signals data processing completed\n";
    }
    cv.notify_one();
}


int initializeData()
{
    // send data to the worker thread
    {
        std::lock_guard<std::mutex> lk(m);
        ready = true;
        std::cout << "Data initialized"<<std::endl;
    }
    cv.notify_one();
    return 0;
}

int consumerThread(unsigned int nbThreads)
{
    std::atomic<unsigned int> nbConsumedthreads=0;
    while (nbConsumedthreads<nbThreads)
    {
        std::unique_lock<std::mutex> lk(m);
        cv.wait(lk, []{return processed;});
        std::cout<<"Data processed counter="<<nbConsumedthreads << " "<<  std::endl;
        ++nbConsumedthreads;
        cv.notify_one();
    }

    return 0;
}

int main()
{
    const unsigned int nbThreads=3;
    std::thread worker1(worker_thread,1);
    std::thread worker2(worker_thread,2);
    std::thread worker3(worker_thread,3);

    std::thread init(initializeData);

    std::thread consume(consumerThread, nbThreads);



    worker1.join();
    worker2.join();
    worker3.join();

    init.join();

    consume.join();

    return 0;
}

希望对您有所帮助,如果您需要更多信息,请告诉我。

于 2013-12-05T16:38:39.057 回答