I am learning my way through synchronization primitives with C++ 11. I have to write those methods for a template class which is a FIFO queue whose maximum number of elements is declared at construction time.
There are two threads which push items into said queue and two that retrieve them. They are synchronized by using two condition variables to make sure that consumer threads only pop items when the queue is not empty and that producer threads push new items only when the queue is not full. The queue has got a open/close status which is used as an additional condition in the wait() call of both condition variable. When the queue is closed, threads should return without performing any operation.
// main.cpp
#include "stdafx.h"
int _tmain(int argc, _TCHAR* argv[]){
BlockingQueue<int> bq(10);
int sum1=0, sum2=0;
std::thread c1([&bq,&sum1](){
int i;
while(bq.get(i)) sum1+=i;
});
std::thread c2([&bq,&sum2](){
int i;
while(bq.get(i)) sum2+=i;
});
std::thread p1([&bq](){
for(int i=0;i<1000;i+=2) bq.put(i);
});
std::thread p2([&bq](){
for(int i=0;i<1000;i+=2) bq.put(i+1);
});
p1.join();
std::cout<<"p1 thread returned."<<std::endl;
p2.join();
std::cout<<"p2 thread returned."<<std::endl;
bq.close();
c1.join();
std::cout<<"c1 thread returned."<<std::endl;
c2.join();
std::cout<<"c2 thread returned."<<std::endl;
std::cout<<"sum1: "<<sum1<<std::endl;
std::cout<<"sum2: "<<sum2<<std::endl;
std::cout<<"total: "<<sum1+sum2<<std::endl;
return 0;
}
Here's the class I've created:
// BlockingQueue.h
#include "stdafx.h"
template<class T> class BlockingQueue
{
std::mutex t_queue_mutex;
std::queue<T> t_queue;
int t_queue_cap_value;
bool isQueueOpen;
std::condition_variable put_condition;
std::condition_variable get_condition;
public:
BlockingQueue(int N);
~BlockingQueue(void);
bool put(T t_item);
bool get(T &t_item);
bool isOpen();
bool isFull();
bool isEmpty();
void close();
};
// BlockinQueue.cpp
#include "BlockingQueue.h"
#include "stdafx.h"
template <class T> BlockingQueue<T>::BlockingQueue(int N)
{
t_queue_cap_value=N;
isQueueOpen=true;
std::cout<<"Rejoice! A bq has been created!"<<std::endl;
}
template <class T> BlockingQueue<T>::~BlockingQueue(void)
{
}
template <class T> bool BlockingQueue<T>::isFull(){
if(t_queue_cap_value==t_queue.size())
return true;
else
return false;
}
template <class T> bool BlockingQueue<T>::isOpen(){
return isQueueOpen;
}
template <class T> void BlockingQueue<T>::close(){
isQueueOpen=false;
}
/* get method */
template <class T> bool BlockingQueue<T>::get(T &t_item){
bool exitThreadStatus=false;
if(!isOpen()){
put_condition.notify_all();
return false;
}
std::unique_lock<std::mutex> ul(t_queue_mutex);
get_condition.wait(ul, [this](){
//std::cout<<"Getter thread with get_id()="<<std::this_thread::get_id()<<" is waiting. isOpen()="<<isOpen()<<" and t_queue.empty()="<<t_queue.empty()<<std::endl;
if(!isOpen())
return true;
else
return !t_queue.empty();
});
if(isOpen()){
exitThreadStatus=true;
t_item=t_queue.front();
t_queue.pop();
}
std::cout<<"Extracted "<<t_item<<". After pop size()="<<t_queue.size()<<std::endl;
put_condition.notify_all();
return exitThreadStatus;
}
/* put method */
template <class T> bool BlockingQueue<T>::put(T t_item){
bool exitThreadStatus=false;
if(!isOpen()){
get_condition.notify_all();
return false;
}
std::unique_lock<std::mutex> ul(t_queue_mutex);
put_condition.wait(ul, [this](){
if(!isOpen())
return true;
else
return !isFull();
});
if(isOpen()){
exitThreadStatus=true;
t_queue.push(t_item);
}
std::cout<<"Inserting "<<t_item<<". After push size()="<<t_queue.size()<<std::endl;
get_condition.notify_all();
return exitThreadStatus;
}
template class BlockingQueue<int>;
It seems to be working correctly whenever I leave the two std::cout lines in get() and put() uncommented, getting the following output (as expected):
Inserting 998. After push size()=2
Extracted 997. After pop size()=1
p1 thread returned.
Inserting 999. After push size()=2
Extracted 998. After pop size()=1
p2 thread returned.
Extracted 999. After pop size()=0
Extracted 998. After pop size()=0
c1 thread returned.
c2 thread returned.
sum1: 250000
sum2: 249500
total: 499500
If I instead comment the cout lines, the two collecting threads never seem to return and I can't understand what is wrong with my code. Does anyone have a clue? Thank you!
Output with commented cout lines:
Rejoice! A bq has been created!
p1 thread returned.
p2 thread returned.