4

我需要一些让人想起 Win32 重置事件的机制,我可以通过与 WaitForSingleObject() 和 WaitForMultipleObjects() 具有相同语义的函数来检查它们(目前只需要 ..SingleObject() 版本)。但我的目标是多个平台,所以我只有 boost::threads (AFAIK) 。我想出了以下课程,并想询问潜在的问题以及是否可以完成任务。提前致谢。

class reset_event
{
 bool flag, auto_reset;
 boost::condition_variable cond_var;
 boost::mutex mx_flag;

public:
 reset_event(bool _auto_reset = false) : flag(false), auto_reset(_auto_reset)
 {
 }

 void wait()
 {
  boost::unique_lock<boost::mutex> LOCK(mx_flag);
  if (flag)
   return;

  cond_var.wait(LOCK);
  if (auto_reset)
   flag = false;
 }

 bool wait(const boost::posix_time::time_duration& dur)
 {
  boost::unique_lock<boost::mutex> LOCK(mx_flag);
  bool ret = cond_var.timed_wait(LOCK, dur) || flag;
  if (auto_reset && ret)
   flag = false;

  return ret;
 }

 void set()
 {
  boost::lock_guard<boost::mutex> LOCK(mx_flag);
  flag = true;
  cond_var.notify_all();
 }

 void reset()
 {
  boost::lock_guard<boost::mutex> LOCK(mx_flag);
  flag = false;
 }
};

示例用法;

reset_event terminate_thread;

void fn_thread()
{
 while(!terminate_thread.wait(boost::posix_time::milliseconds(10)))
 {
  std::cout << "working..." << std::endl;
  boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
 }

 std::cout << "thread terminated" << std::endl;
}

int main()
{
 boost::thread worker(fn_thread);

 boost::this_thread::sleep(boost::posix_time::seconds(1));
 terminate_thread.set();

 worker.join();

 return 0;
}

编辑

我已根据 Michael Burr 的建议修复了代码。我的“非常简单”的测试表明没有问题。

class reset_event
{
    bool flag, auto_reset;
    boost::condition_variable cond_var;
    boost::mutex mx_flag;

public:
    explicit reset_event(bool _auto_reset = false) : flag(false), auto_reset(_auto_reset)
    {
    }

    void wait()
    {
        boost::unique_lock<boost::mutex> LOCK(mx_flag);
        if (flag)
        {
            if (auto_reset)
                flag = false;
            return;
        }

        do
        {
            cond_var.wait(LOCK);
        } while(!flag);

        if (auto_reset)
            flag = false;
    }

    bool wait(const boost::posix_time::time_duration& dur)
    {
        boost::unique_lock<boost::mutex> LOCK(mx_flag);
        if (flag)
        {
            if (auto_reset)
                flag = false;
            return true;
        }

        bool ret = cond_var.timed_wait(LOCK, dur);
        if (ret && flag)
        {
            if (auto_reset)
                flag = false;

            return true;
        }

        return false;
    }

    void set()
    {
        boost::lock_guard<boost::mutex> LOCK(mx_flag);
        flag = true;
        cond_var.notify_all();
    }

    void reset()
    {
        boost::lock_guard<boost::mutex> LOCK(mx_flag);
        flag = false;
    }
};
4

2 回答 2

3

您需要检查/修复的一些事情(注意 - 我绝不是说这些是唯一的事情 - 我只是快速浏览了一下):

  • 在您的wait()函数中,如果已设置为:您不会重置已发出信号的事件auto_reset

     void wait()
     {
      boost::unique_lock<boost::mutex> LOCK(mx_flag);
      if (flag) {
       if (auto_reset) flag = false;    // <-- I think you need this
       return;
      }
    
      cond_var.wait(LOCK);
      if (auto_reset)
       flag = false;
     }
    
  • 在等待条件变量 之前wait(const boost::posix_time::time_duration& dur),您应该检查一下。flag

  • 在这两个wait函数中,如果您等待条件变量,您可能需要重新检查标志以确保其他线程在此期间没有重置事件。对于 auto_reset 事件尤其如此,即使多个线程正在等待该事件,它也应该只释放一个服务员。

于 2011-01-14T16:11:38.947 回答
1

这是我的版本,稍作调整以实现以下目标。

  • 不使用 set()、reset() 等阻止生产者,而是计算“释放”的数量,而不是在布尔条件为真后丢失 1:1 映射。
  • 允许外部调用者为 wait() 指定互斥锁,在我的使用场景中它通常是外部资源,并且可以与内部互斥锁分开。
  • 将 set()、reset_one()、reset_all() 调用移至内部互斥体,现在它们在消费者调用 wait() 之前重复调用时不会阻塞。

现在我的加载线程可以将多个长期存在的请求排队,而不会在忙处理时丢弃任何任务。

我的项目中的进展使用....

Boost 条件变量: -> 发送 3 个加载请求,线程很忙,只看到 1 或 2。
使用 Bool 发布答案: -> 发送 3 个加载请求,由于共享互斥锁,生产者在第二个请求上阻塞。生产者在处理第一个加载请求之前不会解除阻塞。
我的版本-> 发送 3 个加载请求,生产者立即从所有 3 个返回,消费者缓慢但肯定地看到 3 个加载请求:)

希望它可以帮助那里的人。

    类 cNonLossyCondition
    {
        布尔标志,自动重置;
        boost::condition_variable cond_var;
        int lost_signals;
        boost::mutex internal_mutex;

    上市:
        cNonLossyCondition(bool _auto_reset)
        {
            这->标志=假;
            this->auto_reset = auto_reset;
            this->lost_signals = 0;
        }

        无效等待(升压::互斥* mx_flag)
        {
            boost::unique_lock LOCK(*mx_flag);
            如果(标志)
            {
                如果(自动重置)
                    this->reset_one();
                返回;
            }

            做
            {
                cond_var.wait(LOCK);
            } 而(!标志);

            如果(自动重置)
                this->reset_one();
        }

        bool wait(boost::mutex* mx_flag,const boost::posix_time::time_duration& dur)
        {
            boost::unique_lock LOCK(*mx_flag);
            如果(标志)
            {
                如果(自动重置)
                    this->reset_one();
                返回真;
            }

            bool ret = cond_var.timed_wait(LOCK, dur);
            if (ret && 标志)
            {
                如果(自动重置)
                    this->reset_one();

                返回真;
            }

            返回假;
        }

        无效集()
        {
            boost::lock_guard LOCK(this->internal_mutex);
            标志=真;
            if (this->lost_signals lost_signals = 1; //已经增加
            } 别的 {
                this->lost_signals = this->lost_signals + 1;
            }

            cond_var.notify_all();
        }

        无效的 reset_one()
        {
            boost::lock_guard LOCK(this->internal_mutex);
            this->lost_signals = this->lost_signals - 1;
            if (this->lost_signals lost_signals = 0;
                标志=假;
            }

        }
        无效重置全部()
        {
            boost::lock_guard LOCK(this->internal_mutex);
            标志=假;
            this->lost_signals = 0;
        }
    };

于 2012-08-30T03:01:48.190 回答