6

我在使用 pthreads 时遇到问题,我认为我遇到了死锁。我创建了一个我认为可以正常工作的阻塞队列,但是在进行了更多测试后,我发现如果我尝试取消阻塞在 blocking_queue 上的多个线程,我似乎遇到了死锁。

阻塞队列非常简单,看起来像这样:

template <class T> class Blocking_Queue
{
public:
    Blocking_Queue()
    {
        pthread_mutex_init(&_lock, NULL);
        pthread_cond_init(&_cond, NULL);
    }

    ~Blocking_Queue()
    {
        pthread_mutex_destroy(&_lock);
        pthread_cond_destroy(&_cond);
    }

    void put(T t)
    {
        pthread_mutex_lock(&_lock);
        _queue.push(t);
        pthread_cond_signal(&_cond);
        pthread_mutex_unlock(&_lock);
    }

     T pull()
     {
        pthread_mutex_lock(&_lock);
        while(_queue.empty())
        {
            pthread_cond_wait(&_cond, &_lock);
        }

        T t = _queue.front();
        _queue.pop();

        pthread_mutex_unlock(&_lock);

        return t;
     }

priavte:
    std::queue<T> _queue;
    pthread_cond_t _cond;
    pthread_mutex_t _lock;
}

为了测试,我创建了 4 个线程来拉这个阻塞队列。我在阻塞队列中添加了一些打印语句,每个线程都进入 pthread_cond_wait() 方法。但是,当我尝试在每个线程上调用 pthread_cancel() 和 pthread_join() 时,程序就会挂起。

我也只用一个线程对此进行了测试,并且效果很好。

根据文档, pthread_cond_wait() 是一个取消点,因此在这些线程上调用取消应该会导致它们停止执行(这仅适用于 1 个线程)。但是 pthread_mutex_lock 不是取消点。在调用 pthread_cancel() 时是否会发生某些事情,被取消的线程在终止之前获取互斥锁并且不解锁它,然后当下一个线程被取消时它无法获取互斥锁和死锁?还是我做错了什么。

任何建议都会很可爱。谢谢 :)

4

3 回答 3

5

pthread_cancel()最好避免。

您可以通过从那里抛出异常来解除阻塞阻塞在 Blocking_Queue::pull() 上的所有线程。

队列中的一个弱点是T t = _queue.front();调用可能引发异常的 T 的复制构造函数,从而使您的队列互斥锁永远锁定。更好地使用 C++ 范围锁。

这是一个优雅的线程终止示例:

$ cat test.cc
#include <boost/thread/mutex.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/condition_variable.hpp>
#include <exception>
#include <list>
#include <stdio.h>

struct BlockingQueueTerminate
    : std::exception
{};

template<class T>
class BlockingQueue
{
private:
    boost::mutex mtx_;
    boost::condition_variable cnd_;
    std::list<T> q_;
    unsigned blocked_;
    bool stop_;

public:
    BlockingQueue()
        : blocked_()
        , stop_()
    {}

    ~BlockingQueue()
    {
        this->stop(true);
    }

    void stop(bool wait)
    {
        // tell threads blocked on BlockingQueue::pull() to leave
        boost::mutex::scoped_lock lock(mtx_);
        stop_ = true;
        cnd_.notify_all();

        if(wait) // wait till all threads blocked on the queue leave BlockingQueue::pull()
            while(blocked_)
                cnd_.wait(lock);
    }

    void put(T t)
    {
        boost::mutex::scoped_lock lock(mtx_);
        q_.push_back(t);
        cnd_.notify_one();
    }

    T pull()
    {
        boost::mutex::scoped_lock lock(mtx_);

        ++blocked_;
        while(!stop_ && q_.empty())
            cnd_.wait(lock);
        --blocked_;

        if(stop_) {
            cnd_.notify_all(); // tell stop() this thread has left
            throw BlockingQueueTerminate();
        }

        T front = q_.front();
        q_.pop_front();
        return front;
    }
};

void sleep_ms(unsigned ms)
{
    // i am using old boost
    boost::thread::sleep(boost::get_system_time() + boost::posix_time::milliseconds(ms));
    // with latest one you can do this
    //boost::thread::sleep(boost::posix_time::milliseconds(10));
}

void thread(int n, BlockingQueue<int>* q)
try
{
    for(;;) {
        int m = q->pull();
        printf("thread %u: pulled %d\n", n, m);
        sleep_ms(10);
    }
}
catch(BlockingQueueTerminate&)
{
    printf("thread %u: finished\n", n);
}

int main()
{
    BlockingQueue<int> q;

    // create two threads
    boost::thread_group tg;
    tg.create_thread(boost::bind(thread, 1, &q));
    tg.create_thread(boost::bind(thread, 2, &q));
    for(int i = 1; i < 10; ++i)
        q.put(i);
    sleep_ms(100); // let the threads do something
    q.stop(false); // tell the threads to stop
    tg.join_all(); // wait till they stop
}

$ g++ -pthread -Wall -Wextra -o test -lboost_thread-mt test.cc

$ ./test
thread 2: pulled 1
thread 1: pulled 2
thread 1: pulled 3
thread 2: pulled 4
thread 1: pulled 5
thread 2: pulled 6
thread 1: pulled 7
thread 2: pulled 8
thread 1: pulled 9
thread 2: finished
thread 1: finished
于 2011-02-16T16:55:15.617 回答
1

我对 pthread_cond_wait() / pthread_cancel() 有过类似的经验。在线程由于某种原因返回后,我遇到了锁仍然被持有的问题,并且无法解锁它,因为您必须在锁定时在同一个线程中解锁。我在执行 pthread_mutex_destroy() 时注意到了这些错误,因为我有一个单一的生产者,单一的消费者情况,所以没有发生死锁。

pthread_cond_wait() 应该在返回时锁定互斥锁,这可能已经发生,但最终解锁没有通过,因为我们强行取消了线程。为了安全起见,我通常尽量避免完全使用 pthread_cancel() ,因为某些平台甚至不支持这一点。您可以使用 volatile bool 或 atomics 并检查是否应关闭线程。这样,互斥体也将被干净地处理。

于 2011-02-16T16:17:09.610 回答
1

我对 pthread_cancel() 不太熟悉 - 我更喜欢合作终止。

pthread_cancel() 不会让您的互斥锁锁定吗?我想您需要使用取消处理程序进行清理。

于 2011-02-16T16:12:39.597 回答