4

我需要一些机制让人想起Win32重置事件,我可以通过具有相同的语义与WaitForSingleObject()和WaitForMultipleObjects()(只需要..SingleObject()版本此时此刻) 。但我的目标是多平台,所以我拥有的是boost :: threads(AFAIK)。我想出了下面的课,想问一下潜在的问题以及是否能够完成任务。提前致谢。Win32重置事件像同步类与升压C++

class reset_event 
{ 
bool flag, auto_reset; 
boost::condition_variable cond_var; 
boost::mutex mx_flag; 

public: 
reset_event(bool _auto_reset = false) : flag(false), auto_reset(_auto_reset) 
{ 
} 

void wait() 
{ 
    boost::unique_lock<boost::mutex> LOCK(mx_flag); 
    if (flag) 
    return; 

    cond_var.wait(LOCK); 
    if (auto_reset) 
    flag = false; 
} 

bool wait(const boost::posix_time::time_duration& dur) 
{ 
    boost::unique_lock<boost::mutex> LOCK(mx_flag); 
    bool ret = cond_var.timed_wait(LOCK, dur) || flag; 
    if (auto_reset && ret) 
    flag = false; 

    return ret; 
} 

void set() 
{ 
    boost::lock_guard<boost::mutex> LOCK(mx_flag); 
    flag = true; 
    cond_var.notify_all(); 
} 

void reset() 
{ 
    boost::lock_guard<boost::mutex> LOCK(mx_flag); 
    flag = false; 
} 
}; 

示例用法;

reset_event terminate_thread; 

void fn_thread() 
{ 
while(!terminate_thread.wait(boost::posix_time::milliseconds(10))) 
{ 
    std::cout << "working..." << std::endl; 
    boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); 
} 

std::cout << "thread terminated" << std::endl; 
} 

int main() 
{ 
boost::thread worker(fn_thread); 

boost::this_thread::sleep(boost::posix_time::seconds(1)); 
terminate_thread.set(); 

worker.join(); 

return 0; 
} 

编辑

我照着迈克尔·伯尔的建议固定代码。我的“非常简单”测试表明没有问题。

class reset_event 
{ 
    bool flag, auto_reset; 
    boost::condition_variable cond_var; 
    boost::mutex mx_flag; 

public: 
    explicit reset_event(bool _auto_reset = false) : flag(false), auto_reset(_auto_reset) 
    { 
    } 

    void wait() 
    { 
     boost::unique_lock<boost::mutex> LOCK(mx_flag); 
     if (flag) 
     { 
      if (auto_reset) 
       flag = false; 
      return; 
     } 

     do 
     { 
      cond_var.wait(LOCK); 
     } while(!flag); 

     if (auto_reset) 
      flag = false; 
    } 

    bool wait(const boost::posix_time::time_duration& dur) 
    { 
     boost::unique_lock<boost::mutex> LOCK(mx_flag); 
     if (flag) 
     { 
      if (auto_reset) 
       flag = false; 
      return true; 
     } 

     bool ret = cond_var.timed_wait(LOCK, dur); 
     if (ret && flag) 
     { 
      if (auto_reset) 
       flag = false; 

      return true; 
     } 

     return false; 
    } 

    void set() 
    { 
     boost::lock_guard<boost::mutex> LOCK(mx_flag); 
     flag = true; 
     cond_var.notify_all(); 
    } 

    void reset() 
    { 
     boost::lock_guard<boost::mutex> LOCK(mx_flag); 
     flag = false; 
    } 
}; 

回答

3

,你要检查/修复有几件事情(注意 - 我绝不是说,只有这些东西 - 我只有一个快速浏览一下):

  • wait()功能,你不重置已经告知的事件,如果它的成立为auto_reset

    void wait() 
    { 
        boost::unique_lock<boost::mutex> LOCK(mx_flag); 
        if (flag) { 
        if (auto_reset) flag = false; // <-- I think you need this 
        return; 
        } 
    
        cond_var.wait(LOCK); 
        if (auto_reset) 
        flag = false; 
    } 
    
  • wait(const boost::posix_time::time_duration& dur)你应该检查flag之前等待条件变量。

  • 这两个wait函数中,如果您等待条件变量,您可能需要重新检查该标志以确保其他某个线程在此期间未重置事件。对于auto_reset事件尤其如此,即使多个线程正在等待事件时,也应该释放单个服务器。

+0

非常感谢Michael。你能解释一下最后一项吗? – fgungor 2011-01-14 16:33:15

1

这是我的版本,稍作调整以实现以下目标。

  • 没有使用set(),reset()等阻塞生产者,而是计算“发布”的数量,而不是在布尔条件为真后丢失1:1映射。
  • 允许外部调用者指定wait()的互斥量,它在我的使用场景中通常是外部资源,并且可以与内部互斥量分开。
  • 将set(),reset_one(),reset_all()调用移至内部互斥量,现在它们在消费者调用wait()之前重复调用时不会阻塞。

现在我的加载线程可以排队多个长期请求,而不会在繁忙处理时丢弃任何任务。

进展在我的项目中使用....

升压条件变量: - >发送3个负载请求,线程是忙,只能看到使用布尔1或2
发布答: - >由于共享互斥,发送3个加载请求,第2个请求上的生产者块。生产者在第一次加载请求处理之前不会解锁。
我的版本 - >送3个的负载要求,生产者立即从所有3个返回,消费者看到3载荷需求缓慢但肯定:)

希望这可以帮助别人那里。

 

    class cNonLossyCondition 
    { 
     bool flag, auto_reset; 
     boost::condition_variable cond_var; 
     int lost_signals; 
     boost::mutex internal_mutex; 

    public: 
     cNonLossyCondition(bool _auto_reset) 
     { 
      this->flag = false; 
      this->auto_reset = auto_reset; 
      this->lost_signals = 0; 
     } 

     void wait(boost::mutex* mx_flag) 
     { 
      boost::unique_lock LOCK(*mx_flag); 
      if (flag) 
      { 
       if (auto_reset) 
        this->reset_one(); 
       return; 
      } 

      do 
      { 
       cond_var.wait(LOCK); 
      } while(!flag); 

      if (auto_reset) 
       this->reset_one(); 
     } 

     bool wait(boost::mutex* mx_flag,const boost::posix_time::time_duration& dur) 
     { 
      boost::unique_lock LOCK(*mx_flag); 
      if (flag) 
      { 
       if (auto_reset) 
        this->reset_one(); 
       return true; 
      } 

      bool ret = cond_var.timed_wait(LOCK, dur); 
      if (ret && flag) 
      { 
       if (auto_reset) 
        this->reset_one(); 

       return true; 
      } 

      return false; 
     } 

     void set() 
     { 
      boost::lock_guard LOCK(this->internal_mutex); 
      flag = true; 
      if (this->lost_signals lost_signals = 1; //already incremented 
      } else { 
       this->lost_signals = this->lost_signals + 1; 
      } 

      cond_var.notify_all(); 
     } 

     void reset_one() 
     { 
      boost::lock_guard LOCK(this->internal_mutex); 
      this->lost_signals = this->lost_signals - 1; 
      if (this->lost_signals lost_signals = 0; 
       flag = false; 
      } 

     } 
     void reset_all() 
     { 
      boost::lock_guard LOCK(this->internal_mutex); 
      flag = false; 
      this->lost_signals = 0; 
     } 
    };