0

这种情况总是经常发生:我们有一些线程和一个共享对象,我们需要确保任何时候只有一个线程可以修改该对象。

那么,显而易见的解决方案是使用lock the door-do the job-get out of there成语。在这种情况下,我总是使用 POSIX 互斥锁。例如

pthread_mutex_lock(&this->messageRW);     // lock the door
P_Message x = this->messageQueue.front(); // do the job
this->messageQueue.pop(); 
pthread_mutex_unlock(&this->messageRW);   // get out of there
// somewhere else, in another thread
while (true) {
    P_Message message;
    solver->listener->recvMessage(message);
    pthread_mutex_lock(&(solver->messageRW));     // lock the door
    solver->messageQueue.push(message);           // do the job
    pthread_mutex_unlock(&(solver->messageRW));   // get out of there
    sem_post(&solver->messageCount);
}

messageQueue在代码中的很多地方都使用过。所以最终得到了很多不优雅的锁定/解锁对。我认为应该有一种方法可以声明messageQueue为应该在线程之间共享的对象,然后线程 API 可以处理锁定/解锁。我可以想到一个包装类,或者类似的东西。尽管其他 API(增强线程或其他库)也可以接受,但基于 POSIX 的解决方案是首选。

在类似的情况下你会实施什么?

为未来的读者更新

我找到了这个。我猜这将成为 C++14 的一部分。

4

3 回答 3

2

您可以boost:scoped_lock在这种情况下使用。一旦超出范围,它就会优雅地解锁:

 boost::mutex mMutex;//member mutex object defined somewhere


{ //scope operator start
   boost::mutex::scoped_lock scopedLock(mMutex);
   pthread_mutex_lock();     // scoped lock the door
   P_Message x = this->messageQueue.front(); // do the job
   this->messageQueue.pop(); 
} //scope operator end, unlock mutex

// somewhere else, in another thread
while (true) {
    P_Message message;
    solver->listener->recvMessage(message);
    boost::mutex::scoped_lock scopedLock(mMutex);     // scoped lock the door
    solver->messageQueue.push(message);           // do the job
    sem_post(&solver->messageCount);
} //scope operator end, unlock mutex
于 2013-07-03T08:11:25.970 回答
2

为此,我将子类(is-a)或包含(has-a)消息队列类到另一个强制使用互斥锁的类中。

这在功能上就是其他语言所做的,例如使用 Javasynchronized关键字 - 它修改了底层对象以自动保护。

于 2013-07-03T08:12:08.487 回答
1

消息队列本身应该处理锁定(是原子的),而不是调用代码。你需要的不仅仅是一个互斥锁,你还需要一个条件来避免竞争条件。标准的成语是这样的:

class ScopedLock    //  You should already have this one anyway
{
    pthread_mutex_t& myOwned;
    ScopedLock( ScopedLock const& );
    ScopedLock& operator=( ScopedLock const& );
public:
    ScopedLock( pthread_mutex_t& owned )
        : myOwned( owned )
    {
        pthread_mutex_lock( &myOwned );
    }
    ~ScopedLock()
    {
        pthread_mutex_unlock( &myOwned );
    }
};

class MessageQueue
{
    std::deque<Message> myQueue;
    pthread_mutex_t myMutex;
    pthread_cond_t myCond;
public:
    MessageQueue()
    {
        pthread_mutex_init( &myMutex );
        pthread_cond_init( &myCond );
    }
    void push( Message const& message )
    {
        ScopedLock( myMutex );
        myQueue.push_back( message );
        pthread_cond_broadcast( &myCond );
    }

    Message pop()
    {
        ScopedLock( myMutex );
        while ( myQueue.empty() ) {
            pthread_cond_wait( &myCond, &myMutex );
        }
        Message results = myQueue.front();
        myQueue.pop_front();
        return results;
    }
};

这需要更多的错误处理,但基本结构就在那里。

当然,如果您可以使用 C++11,则最好使用标准线程原语。(否则,我通常会建议使用 Boost 线程。但如果您已经在使用 Posix 线程,则可能需要等待转换,直到可以使用标准线程,而不是转换两次。)但是您仍然需要两个互斥锁和一个条件。

于 2013-07-03T08:27:39.563 回答