3

Here's a fine-grained locking queue introduced by Anthony Williams in chapter 6.2.3 C++ Concurrency in Action.

/*
    pop only need lock head_mutex and a small section of tail_mutex,push only need
    tail_mutex mutex.maximum container concurrency.
*/
template<typename T> class threadsafe_queue
{
    private:
    struct node
    {
        std::shared_ptr<T> data;
        std::unique_ptr<node> next;
    }
    std::mutex head_mutex;   //when change the head lock it.
    std::unique_ptr<node> head;  
    std::mutex tail_mutex;   //when change the tail lock it.
    node* tail;
    std::condition_variable data_cond;

    node* get_tail()
    {
        std::lock_guard<std::mutex> tail_lock(tail_mutex);
        return tail;
    }

    public:
    /* 
        create a dummy node
    */
    threadsafe_queue():
        head(new node),tail(head.get())
    {}

    std::shared_ptr<T> wait_and_pop()
    {
        std::unique_lock<std::mutex> head_lock;
        data_cond.wait(head_lock,[&]{return head.get()!=get_tail();}); //#1
        std::unique_ptr<node> old_head=std::move(head);
        head=std::move(old_head->next);
        return old_head;
    }

    void push(T new_value)
    {
        std::shared_ptr<T> new_data(
        std::make_shared<T>(std::move(new_value)));
        std::unique_ptr<node> p(new node);
        {
            std::lock_guard<std::mutex> tail_lock(tail_mutex);
            tail->data=new_data;
            node* const new_tail=p.get();
            tail->next=std::move(p);
            tail=new_tail;
        }
        data_cond.notify_one();
    }
}

Here's the situation: There are two threads (thread1 and thread2). thread1 is doing a wait_and_pop and thread2 is doing a push. The queue is empty.

thread1 is in #2, had already checked head.get()!=get_tail() before data_cond.wait(). At this time its CPU period had run out. thread2 begins.

thread2 finished the push function and did data_cond.notify_one(). thread1 begins again.

Now thread1 begins data_cond.wait(), but it waits forever.

Would this situation possibly happen ?If so, how to get this container fixed ?

4

2 回答 2

7

是的,OP 中描述的情况是可能的,并且会导致通知丢失。在谓词函数中注入一个不错的大时间延迟可以很容易地触发。这是 Coliru 的演示。请注意程序如何需要 10 秒才能完成(超时的长度wait_for)而不是 100 毫秒(生产者在队列中插入项目的时间)。通知丢失。

在条件变量的设计中有一个隐含的假设,即当关联的互斥锁被锁定时,条件的状态(谓词的返回值)不能改变。对于此队列实现而言,情况并非如此,因为push可以在不持有 的情况下更改队列的“空” head_mutex

§30.5p3 指定wait具有三个原子部分:

  1. 释放互斥体,进入等待状态;
  2. 解除等待;和
  3. 重新获取锁。

请注意,这些都没有提到检查谓词(如果有的话)wait。with 谓词的行为wait在 §30.5.1p15 中描述:

效果:

而(!pred())
      等待(锁定);

请注意,这里也不能保证谓词检查和wait是原子执行的。有一个先决条件lock锁定,并且它是由调用线程持有的关联互斥锁。

至于修复容器以避免丢失通知,我会将其更改为单个互斥体实现并完成它。push当和pop最终都采用相同的互斥锁 ( )时,将其称为细粒度锁定有点tail_mutex牵强。

于 2013-08-01T08:15:45.223 回答
0

data_cond.wait()每次唤醒时检查条件。所以即使它可能已经被检查过了,它会在之后再次被检查data_cond.notify_one()。此时,有数据要弹出(因为线程 2 刚刚完成推送),所以它返回。在这里阅读更多。

您唯一应该担心的是,当您调用wait_and_pop一个空队列然后不再向其推送任何数据时。此代码没有超时等待和返回错误(或抛出异常)的机制。

于 2013-08-01T03:19:00.493 回答