4

好吧,我正在尝试处理某种队列。我有一个IO 线程,它专用于从 std::queue 中弹出数据,但问题是我正在使用 Sleep()以防止 100% cpu 常量检查。当然还有其他线程会将项目添加到 std::queue。

我怎样才能使线程处于休眠状态并且在 std::queue 不为空时启动?

IO线程

Sleep(100);
while (!myqueue.empty())
  {
     //process data FIFO
     myqueue.pop(); //pop out and continue
  }

非常感谢,谢谢!哦,这对于 c++11 或 c++03 没关系 - 在 Windows 上。

4

3 回答 3

7

std::queue线程完全无关。完全没有。它的.empty()成员不是线程安全的(只能重入)!这同样适用于它的所有其他成员。因此,多个线程可以随意使用不同的队列,但每次只有一个线程可以对每个实例执行任何操作。

C++11 或 C++03很重要。因为 C++11 定义了线程同步原语,而 C++03 没有,你必须使用 OS API。

在 C++11 中,您会对std::condition_variable.

在 C++03 中,您会对Boost.Thread(大部分与 C++11 兼容)EventsSemaphores感兴趣。

在任何一种情况下,它们std::queue::push()std::queue::pop()它们本身都必须受到互斥的保护。std::condition_variable甚至强制您使用一个 ( ) std::mutex,在 Windows API 中您将使用Critical Section

在 Windows 上,C++11 类仅在 Visual Studio 2012 和 Windows 8 中可用。对于较旧的编译器,请使用 Boost(优点是可移植)或本机 API。

于 2013-02-04T12:01:51.983 回答
1

你需要一个“条件变量”。每当一个线程将某些东西放在队列中时,它都会“通知”等待条件变量的线程。从队列中消费事件的线程等待条件变量。在有人通过条件变量通知它之前,它处于休眠状态。

Boost有一个很好的实现: http: //www.boost.org/doc/html/thread/synchronization.html#thread.synchronization.condvar_ref

您正在使用锁来确保对队列的访问是线程安全的,不是吗?

于 2013-02-04T12:00:55.657 回答
0

我的建议是首先研究如何使用线程安全队列,然后考虑使用 boost::condition 信号来提供更多控制。

这是一个如何构建线程安全队列的示例:

#pragma once                                                                              
#include <queue>                                                                       

template<typename T>                                                                       
class thread_safe_queue                                                                    
{                                                                                          
    queue<T>   m_queue;                                                                    
    pthread_mutex_t m_mutex;                                                               
    pthread_cond_t  m_condv;                                                               

    public:                                                                                
    thread_safe_queue() {                                                                  
        pthread_mutex_init(&m_mutex, NULL);                                                
        pthread_cond_init(&m_condv, NULL);                                                 
    }                                                                                      
    ~thread_safe_queue() {                                                                 
        pthread_mutex_destroy(&m_mutex);                                                   
        pthread_cond_destroy(&m_condv);                                                    
    }                                                                                      
    void push(T& item) {                                                                   
        pthread_mutex_lock(&m_mutex);                                                      

        T itemcpy = std::move(item);                                                       
        m_queue.push(std::move(itemcpy));                                                  

        pthread_cond_signal(&m_condv);                                                     
        pthread_mutex_unlock(&m_mutex);                                                    
    }                                                                                      
    T pop() {                                                                              
        pthread_mutex_lock(&m_mutex);                                                      
        while (m_queue.size() == 0) {                                                      
            pthread_cond_wait(&m_condv, &m_mutex);                                         
        }                                                                                  

        T& _item = m_queue.front();                                                        
        T itemcpy = std::move(_item);                                                      

        m_queue.pop();                                                                     
        pthread_mutex_unlock(&m_mutex);                                                    
        return itemcpy;                                                                    
    }                                                                                      
    int size() {                                                                           
        pthread_mutex_lock(&m_mutex);                                                      
        int size = m_queue.size();                                                         
        pthread_mutex_unlock(&m_mutex);                                                    
        return size;                                                                       
    }                                                                                      
};                                                                            

这就是你实例化它的方式:

thread_safe_queue<myclass> myqueue;

如果您想使用事件信号,请考虑使用 boost::condition - fx。像这样:

#include <boost/thread/condition.hpp>
#include <boost/thread/mutex.hpp>

boost::mutex mtxWait;
boost::condition cndSignalQueueHasNewEntry;

bool WaitForQueueSignal(long milliseconds)
{
    boost::mutex::scoped_lock mtxWaitLock(mtxWait);
    boost::posix_time::time_duration wait_duration =  boost::posix_time::milliseconds(milliseconds); // http://www.boost.org/doc/libs/1_34_0/doc/html/date_time/posix_time.html
    boost::system_time const timeout=boost::get_system_time()+wait_duration; // http://www.justsoftwaresolutions.co.uk/threading/condition-variable-spurious-wakes.html
    return cndSignalQueueHasNewEntry.timed_wait(mtxWait,timeout); // wait until signal notify_one or timeout
}

这就是你可以发出信号的方式

cndSignalQueueHasNewEntry.notify_one();

这就是你可以等待信号的方式

bool bResult = WaitForQueueSignal(10000); // timeout after 10 seconds
于 2017-01-19T12:54:10.443 回答