1

我在

http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue

并且测试非常简单:

mpmc_bounded_queue<string> mq(8) ;

for(int idx=0;idx<10;idx++)
{
    string s = to_string(idx);
    if(mq.enqueue(s))
        cout << s << " ok" << endl ;
    else
        cout << s << " not ok" << endl ;
}

string strx;    
if(mq.dequeue(strx))
{
    cout << "The dequeue value=" << strx << endl ;
}else{
    assert(1 == 2) ;
}

string s = "THE END" ;
if(mq.enqueue(s))
    cout << s << " ok" << endl ;
else
    cout << s << " not ok" << endl ;

在我研究了这段代码之后,我想确定我是否可以正确使用这个源,假设有一个线程 C 等待一个信号量,唤醒后(信号量>0),线程 C 将调用 dequeue 函数从队列中获取数据!!线程A和线程B会调用入队函数,插入一条数据后,信号量+1,所以如果线程A和线程B入队10次,信号量为10,那么线程C应该可以出队10次,那应该到目前为止工作完美!

但我认为可能会发生一件事!...假设线程 A 和线程 B 同时入队,在 Compare-And-Swap 之后,线程 A 应该填充缓冲区 [0],线程 B 应该填充缓冲区 [1],线程 A 没有完成入队并退出由操作系统,所以缓冲区[0]的sequence_没有被修改,线程B完成入队并增加信号量,此时,线程A没有完成缓冲区[0],线程B完成缓冲区[1],线程c被唤醒因为 semaphore = 1 ,所以线程 C 将出队,它试图访问尚未准备好的缓冲区 [0] ,所以出队将返回 false ,这意味着线程 C 应该实现如下:

Right !!

while(1){
    sem_wait(sem1) ;
    while(1){
        if(mq.dequeue(strx))
            break ;
    }
    //doing something about strx
}

=====================================

Wrong !!

while(1){
    sem_wait(sem1) ;
    mq.dequeue(strx) ;
    //doing something about strx
}
=====================================

我的意思是,线程 C 被唤醒意味着有新数据入队,但它可能不在 buffer[dequeue_pos_] 的位置,所以将 dequeue 函数放入无限循环中会有帮助!!!!

我在这个工具中正确吗?我的意思是,应该有机会在 buffer[0] 之前填充 buffer[1] ,并且线程 C 唤醒试图使 buffer[0] 出队失败,保持出队在无限循环中不会永远等待.....因为 while线程A被操作系统切换,缓冲区[0]填充完成,线程C将中断循环!

任何建议,评论表示赞赏!

4

0 回答 0