1

下面是一个使用线程安全无界队列的非常简单的生产者/消费者问题示例。谁能解释一下为什么这段代码在使用 GNU C++ 编译时表现正确,而消费者线程在使用 LLVM C++ 编译时会随机放弃?

#include <iostream>
#include <queue>
#include <math.h>
#include <time.h>
#include "boost/thread/condition_variable.hpp"
#include "boost/thread.hpp"

//
// THREAD SAFE QUEUE
//
template<typename Data>
class Concurrent_Queue
{
private:
    std::queue<Data> the_queue;
    mutable boost::mutex the_mutex;
    boost::condition_variable the_condition_variable;
public:
    void push(Data const& data)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        the_queue.push(data);
        lock.unlock();
        printf("\n...just pushed, waking a thread...\n\n");
        the_condition_variable.notify_one();
    }

    bool empty() const
    {
        boost::mutex::scoped_lock lock(the_mutex);
        return the_queue.empty();
    }

    bool try_pop(Data& popped_value)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        if(the_queue.empty())
        {
            return false;
        }

        popped_value=the_queue.front();
        the_queue.pop();
        return true;
    }

    void wait_and_pop(Data& popped_value)
    {
        boost::mutex::scoped_lock lock(the_mutex);
        while(the_queue.empty())
        {
            printf("\n...buffer empty, waiting to pop...\n\n");
            the_condition_variable.wait(lock);
        }

        popped_value=the_queue.front();
        the_queue.pop();
    }

    int len() {
        boost::mutex::scoped_lock lock(the_mutex);
        return (int)the_queue.size();
    }

};

//
// PRODUCER
//    
class Producer {
private:
    Concurrent_Queue<int> *buff;
    int next;
public:
    Producer(Concurrent_Queue<int> *q): buff(q) { printf("Prod up!\n"); }
    ~Producer() {}
    void run() {
        int wait_time = 0;
        while(1) {
            wait_time = (rand()%5)+1;
            sleep(wait_time);
            printf("wait_time: %d\n", wait_time);
            buff->push(wait_time);
            printf("buffer_len: %d\n", buff->len());
        }
    }
};

//
// CONSUMER 
//    
class Consumer {
private:
    Concurrent_Queue<int> * buff;
public:
    Consumer(Concurrent_Queue<int> *q): buff(q) { printf("Con up!\n"); }
    ~Consumer() {}
        void run() {
        unsigned wait_time = 0;
        int latest = 0;
        while(1) {
            wait_time = (rand()%7)+1;
            sleep(wait_time);
            buff->wait_and_pop(latest);
            printf("latest consumed int: %d\n", latest);
            printf("cons buff_len: %d\n", buff->len());
        }
    }
};

//
//  MAIN
//
int main(int argc, const char * argv[])
{
    srand((unsigned)time(NULL));
    Concurrent_Queue<int> Con_Q;
    Consumer taker(&Con_Q);
//  sleep(3);
    Producer giver(&Con_Q);
    boost::thread* prod_thread = new boost::thread(boost::bind(&Producer::run, &giver));
    boost::thread* cons_thread = new boost::thread(boost::bind(&Consumer::run, &taker));

    prod_thread->join();
    cons_thread->join();
}
4

1 回答 1

2

您应该将通知调用移到互斥锁下。

这记录在 pthreads(7) 联机帮助页的某处。我会试着找到它。

更新我目前能找到的最相关的报价是:

pthread_cond_broadcast()orpthread_cond_signal()函数可以被线程调用,无论它当前是否拥有线程调用的互斥锁或pthread_cond_wait()pthread_cond_timedwait()等待期间与条件变量关联的互斥锁;但是,如果需要可预测的调度行为,则该互斥锁应由调用pthread_cond_broadcast()或的线程锁定pthread_cond_signal()

如果当前没有线程阻塞在 cond 上,pthread_cond_broadcast()andpthread_cond_signal()函数将无效。

我知道像 Helgrind 这样的线程检查工具会抱怨如果在锁之外发出条件信号。

旁注:

  • 前几天碰巧写了一个带任务队列的thread_pool,也支持shutdown。您可以尝试这是否会在您的 Mac 上出现以下症状:

  • bool empty() const并不是很有用,因为它是一个匆忙的电话。如果将锁转移给调用者,它只会是线程安全的

  • int len() const有同样的问题
  • 您可以使用谓词版本cv::wait()来获得更清晰的代码:

    void wait_and_pop(Data& popped_value)
    {
        namespace phx = boost::phoenix;
    
        boost::unique_lock<boost::mutex> lock(the_mutex);
    
        //if (the_queue.empty()) printf("\n...buffer empty, waiting to pop...\n\n");
    
        the_condition_variable.wait(lock, !phx::bind(&queue_t::empty, phx::ref(the_queue)));
    
        popped_value = the_queue.front();
        the_queue.pop();
    }
    
  • 我更喜欢使用与 c++11 类似的接口(unique_lock<>over mutex::scoped_lock),这样更容易切换。

  • 制作人有一个未使用的字段next- 我删除了它

这是我的版本,稍作修改,因此您可以复制/粘贴以检查 MacOS(我没有 Mac):

#include <iostream>
#include <queue>
#include "boost/thread.hpp"
#include "boost/phoenix.hpp"

//
// THREAD SAFE QUEUE
//
template<typename Data>
class Concurrent_Queue
{
private:
    typedef std::queue<Data> queue_t;
    queue_t the_queue;

    mutable boost::mutex the_mutex;
    boost::condition_variable the_condition_variable;
public:
    void push(Data const& data)
    {
        boost::lock_guard<boost::mutex> lock(the_mutex);
        the_queue.push(data);

        printf("\n...just pushed, waking a thread...\n\n");
        the_condition_variable.notify_one();
    }

#ifdef UNUSED_CODE
    bool empty() const
    {
        boost::lock_guard<boost::mutex> lock(the_mutex);
        return the_queue.empty();
    }

    bool try_pop(Data& popped_value)
    {
        boost::lock_guard<boost::mutex> lock(the_mutex);
        if(the_queue.empty())
        {
            return false;
        }

        popped_value=the_queue.front();
        the_queue.pop();
        return true;
    }
#endif

    void wait_and_pop(Data& popped_value)
    {
        namespace phx = boost::phoenix;

        boost::unique_lock<boost::mutex> lock(the_mutex);

        //if (the_queue.empty()) printf("\n...buffer empty, waiting to pop...\n\n");

        the_condition_variable.wait(lock, !phx::bind(&queue_t::empty, phx::ref(the_queue)));

        popped_value = the_queue.front();
        the_queue.pop();
    }

    std::size_t len() {
        boost::lock_guard<boost::mutex> lock(the_mutex);
        return the_queue.size();
    }

};

//
// PRODUCER
//    
class Producer {
private:
    Concurrent_Queue<int> &buff;
public:
    Producer(Concurrent_Queue<int> &q): buff(q) { printf("Prod up!\n"); }
    ~Producer() {}
    void run() {
        int wait_time = 0;
        while(1) {
            wait_time = (rand()%5)+1;
            boost::this_thread::sleep_for(boost::chrono::seconds(wait_time));
            printf("wait_time: %d\n", wait_time);
            buff.push(wait_time);
            printf("buffer_len: %lu\n", buff.len());
        }
    }
};

//
// CONSUMER 
//    
class Consumer {
private:
    Concurrent_Queue<int> & buff;
public:
    Consumer(Concurrent_Queue<int> &q): buff(q) { printf("Con up!\n"); }
    ~Consumer() {}
        void run() {
        unsigned wait_time = 0;
        int latest = 0;
        while(1) {
            wait_time = (rand()%7)+1;
            boost::this_thread::sleep_for(boost::chrono::seconds(wait_time));
            buff.wait_and_pop(latest);
            printf("latest consumed int: %d\n", latest);
            printf("cons buff_len: %lu\n", buff.len());
        }
    }
};

//
//  MAIN
//
int main()
{
    srand((unsigned)time(NULL));
    Concurrent_Queue<int> Con_Q;

    Consumer taker(Con_Q);
    //boost::this_thread::sleep_for(boost::chrono::seconds(3));
    Producer giver(Con_Q);

    boost::thread_group group;
    group.create_thread(boost::bind(&Producer::run, &giver));
    group.create_thread(boost::bind(&Consumer::run, &taker));

    group.join_all();
}
于 2014-04-03T12:20:04.183 回答