我在
http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
并且测试非常简单:
mpmc_bounded_queue<string> mq(8) ;
for(int idx=0;idx<10;idx++)
{
string s = to_string(idx);
if(mq.enqueue(s))
cout << s << " ok" << endl ;
else
cout << s << " not ok" << endl ;
}
string strx;
if(mq.dequeue(strx))
{
cout << "The dequeue value=" << strx << endl ;
}else{
assert(1 == 2) ;
}
string s = "THE END" ;
if(mq.enqueue(s))
cout << s << " ok" << endl ;
else
cout << s << " not ok" << endl ;
在我研究了这段代码之后,我想确定我是否可以正确使用这个源,假设有一个线程 C 等待一个信号量,唤醒后(信号量>0),线程 C 将调用 dequeue 函数从队列中获取数据!!线程A和线程B会调用入队函数,插入一条数据后,信号量+1,所以如果线程A和线程B入队10次,信号量为10,那么线程C应该可以出队10次,那应该到目前为止工作完美!
但我认为可能会发生一件事!...假设线程 A 和线程 B 同时入队,在 Compare-And-Swap 之后,线程 A 应该填充缓冲区 [0],线程 B 应该填充缓冲区 [1],线程 A 没有完成入队并退出由操作系统,所以缓冲区[0]的sequence_没有被修改,线程B完成入队并增加信号量,此时,线程A没有完成缓冲区[0],线程B完成缓冲区[1],线程c被唤醒因为 semaphore = 1 ,所以线程 C 将出队,它试图访问尚未准备好的缓冲区 [0] ,所以出队将返回 false ,这意味着线程 C 应该实现如下:
Right !!
while(1){
sem_wait(sem1) ;
while(1){
if(mq.dequeue(strx))
break ;
}
//doing something about strx
}
=====================================
Wrong !!
while(1){
sem_wait(sem1) ;
mq.dequeue(strx) ;
//doing something about strx
}
=====================================
我的意思是,线程 C 被唤醒意味着有新数据入队,但它可能不在 buffer[dequeue_pos_] 的位置,所以将 dequeue 函数放入无限循环中会有帮助!!!!
我在这个工具中正确吗?我的意思是,应该有机会在 buffer[0] 之前填充 buffer[1] ,并且线程 C 唤醒试图使 buffer[0] 出队失败,保持出队在无限循环中不会永远等待.....因为 while线程A被操作系统切换,缓冲区[0]填充完成,线程C将中断循环!
任何建议,评论表示赞赏!