2010-11-25 57 views
1

我想获得关于下面列出的IService类的一些反馈。据我所知,这种类型与“活动对象”模式有关。如果我错误地使用任何相关术语,请原谅/纠正。基本上这个想法是,使用这个活动对象类的类需要提供一个控制某个事件循环的start和stop方法。这个事件循环可以通过while循环或boost asio等实现。线程相关的活动对象设计问题(C++ boost)

该类负责以非阻塞的方式启动一个新线程,以便事件可以在新线程中处理。它还必须处理所有清理相关的代码。我首先尝试了一种面向对象方法,其中子类负责覆盖方法来控制事件循环,但清理很麻烦:在析构函数调用stop方法时,在调用类没有手动调用停止方法。模板化的解决方案似乎是一个更加简洁:

template <typename T> 
class IService : private boost::noncopyable 
{ 
    typedef boost::shared_ptr<boost::thread> thread_ptr; 
public: 

    IService() 
    { 
    } 

    ~IService() 
    { 
    /// try stop the service in case it's running 
    stop(); 
    } 

    void start() 
    { 
    boost::mutex::scoped_lock lock(m_threadMutex); 

    if (m_pServiceThread && m_pServiceThread->joinable()) 
    { 
     // already running 
     return; 
    } 

    m_pServiceThread = thread_ptr(new boost::thread(boost::bind(&IService::main, this))); 

    // need to wait for thread to start: else if destructor is called before thread has started 

    // Wait for condition to be signaled and then 
    // try timed wait since the application could deadlock if the thread never starts? 
    //if (m_startCondition.timed_wait(m_threadMutex, boost::posix_time::milliseconds(getServiceTimeoutMs()))) 
    //{ 
    //} 
    m_startCondition.wait(m_threadMutex); 

    // notify main to continue: it's blocked on the same condition var 
    m_startCondition.notify_one(); 
    } 

    void stop() 
    { 
    // trigger the stopping of the event loop 
    m_serviceObject.stop(); 

    if (m_pServiceThread) 
    { 
     if (m_pServiceThread->joinable()) 
     { 
     m_pServiceThread->join(); 
     } 
     // the service is stopped so we can reset the thread 
     m_pServiceThread.reset(); 
    } 
    } 

private: 
    /// entry point of thread 
    void main() 
    { 
    boost::mutex::scoped_lock lock(m_threadMutex); 
    // notify main thread that it can continue 
    m_startCondition.notify_one(); 

    // Try Dummy wait to allow 1st thread to resume??? 
    m_startCondition.wait(m_threadMutex); 

    // call template implementation of event loop 
    m_serviceObject.start(); 
    } 

    /// Service thread 
    thread_ptr m_pServiceThread; 
    /// Thread mutex 
    mutable boost::mutex m_threadMutex; 
    /// Condition for signaling start of thread 
    boost::condition m_startCondition; 

    /// T must satisfy the implicit service interface and provide a start and a stop method 
    T m_serviceObject; 
}; 

类可以使用如下:

class TestObject3 
{ 
public: 
    TestObject3() 
     :m_work(m_ioService), 
     m_timer(m_ioService, boost::posix_time::milliseconds(200)) 
    { 
     m_timer.async_wait(boost::bind(&TestObject3::doWork, this, boost::asio::placeholders::error)); 
    } 

    void start() 
    { 
     // simple event loop 
     m_ioService.run(); 
    } 

    void stop() 
    { 
     // signal end of event loop 
     m_ioService.stop(); 
    } 

    void doWork(const boost::system::error_code& e) 
    { 
     // Do some work here 
     if (e != boost::asio::error::operation_aborted) 
     { 
     m_timer.expires_from_now(boost::posix_time::milliseconds(200)); 
     m_timer.async_wait(boost::bind(&TestObject3::doWork, this, boost::asio::placeholders::error)); 
     } 
    } 

private: 
    boost::asio::io_service m_ioService; 
    boost::asio::io_service::work m_work; 
    boost::asio::deadline_timer m_timer; 
}; 

现在我的具体问题:

1)是利用升压条件变量正确吗?这对我来说似乎有些破绽:我想等待线程启动,所以我等待条件变量。然后,一旦新的线程在main方法中启动,我再次等待相同的条件变量以允许初始线程继续。然后一旦初始线程的启动方法退出,新线程就可以继续。这个可以吗?

2)是否有任何情况下,线程不会被操作系统成功启动?我记得在某处可能会发生这种情况。如果可以的话,我宁愿在条件变量上进行定时等待(正如start方法中注释的那样)?我知道模板类无法正确地实现停止方法,即如果事件循环无法停止,代码将阻止连接(在停止或在析构函数中),但是我看不到这个。我猜这取决于该课程的用户,以确保启动和停止方法正确实施?

4)我将不胜感激任何其他设计错误,改进等?

谢谢!

1)经过大量的测试中使用的条件变量的优良似乎

2)这个问题还没有冒出了(还)

3)模板类:

回答

0

下列终于尘埃落定实施必须满足的要求,单元测试用于测试 正确性

4)改进

  • 添加锁
  • 在捕获产生的线程异常,在主线程中重新抛出,以避免崩溃并没有松动的异常信息加入
  • 使用boost ::系统:: ERROR_CODE沟通错误代码回呼叫者
  • 实施对象设置能够

代码:

template <typename T> 
class IService : private boost::noncopyable 
{ 
    typedef boost::shared_ptr<boost::thread> thread_ptr; 
    typedef T ServiceImpl; 
public: 
    typedef boost::shared_ptr<IService<T> > ptr; 

    IService() 
    :m_pServiceObject(&m_serviceObject) 
    { 
    } 

    ~IService() 
    { 
    /// try stop the service in case it's running 
    if (m_pServiceThread && m_pServiceThread->joinable()) 
    { 
     stop(); 
    } 
    } 

    static ptr create() 
    { 
    return boost::make_shared<IService<T> >(); 
    } 

    /// Accessor to service implementation. The handle can be used to configure the implementation object 
    ServiceImpl& get() { return m_serviceObject; } 
    /// Mutator to service implementation. The handle can be used to configure the implementation object 
    void set(ServiceImpl rServiceImpl) 
    { 
    // the implementation object cannot be modified once the thread has been created 
    assert(m_pServiceThread == 0); 
    m_serviceObject = rServiceImpl; 
    m_pServiceObject = &m_serviceObject; 
    } 

    void set(ServiceImpl* pServiceImpl) 
    { 
    // the implementation object cannot be modified once the thread has been created 
    assert(m_pServiceThread == 0); 

    // make sure service object is valid 
    if (pServiceImpl) 
     m_pServiceObject = pServiceImpl; 
    } 

    /// if the service implementation reports an error from the start or stop method call, it can be accessed via this method 
    /// NB: only the last error can be accessed 
    boost::system::error_code getServiceErrorCode() const { return m_ecService; } 

    /// The join method allows the caller to block until thread completion 
    void join() 
    { 
    // protect this method from being called twice (e.g. by user and by stop) 
    boost::mutex::scoped_lock lock(m_joinMutex); 
    if (m_pServiceThread && m_pServiceThread->joinable()) 
    { 
     m_pServiceThread->join(); 
     m_pServiceThread.reset(); 
    } 
    } 

    /// This method launches the non-blocking service 
    boost::system::error_code start() 
    { 
    boost::mutex::scoped_lock lock(m_threadMutex); 

    if (m_pServiceThread && m_pServiceThread->joinable()) 
    { 
     // already running 
     return boost::system::error_code(SHARED_INVALID_STATE, shared_category); 
    } 

    m_pServiceThread = thread_ptr(new boost::thread(boost::bind(&IService2::main, this))); 
    // Wait for condition to be signaled 
    m_startCondition.wait(m_threadMutex); 

    // notify main to continue: it's blocked on the same condition var 
    m_startCondition.notify_one(); 
    // No error 
    return boost::system::error_code(); 
    } 

    /// This method stops the non-blocking service 
    boost::system::error_code stop() 
    { 
    // trigger the stopping of the event loop 
    //boost::system::error_code ec = m_serviceObject.stop(); 
    assert(m_pServiceObject); 
    boost::system::error_code ec = m_pServiceObject->stop(); 
    if (ec) 
    { 
     m_ecService = ec; 
     return ec; 
    } 

    // The service implementation can return an error code here for more information 
    // However it is the responsibility of the implementation to stop the service event loop (if running) 
    // Failure to do so, will result in a block 
    // If this occurs in practice, we may consider a timed join? 
    join(); 

    // If exception was thrown in new thread, rethrow it. 
    // Should the template implementation class want to avoid this, it should catch the exception 
    // in its start method and then return and error code instead 
    if(m_exception) 
     boost::rethrow_exception(m_exception); 

    return ec; 
    } 

private: 
    /// runs in it's own thread 
    void main() 
    { 
    try 
    { 
     boost::mutex::scoped_lock lock(m_threadMutex); 
     // notify main thread that it can continue 
     m_startCondition.notify_one(); 
     // Try Dummy wait to allow 1st thread to resume 
     m_startCondition.wait(m_threadMutex); 

     // call implementation of event loop 
     // This will block 
     // In scenarios where the service fails to start, the implementation can return an error code 
     m_ecService = m_pServiceObject->start(); 

     m_exception = boost::exception_ptr(); 
    } 
    catch (...) 
    { 
     m_exception = boost::current_exception(); 
    } 
    } 

    /// Service thread 
    thread_ptr m_pServiceThread; 
    /// Thread mutex 
    mutable boost::mutex m_threadMutex; 
    /// Join mutex 
    mutable boost::mutex m_joinMutex; 
    /// Condition for signaling start of thread 
    boost::condition m_startCondition; 

    /// T must satisfy the implicit service interface and provide a start and a stop method 
    T m_serviceObject; 
    T* m_pServiceObject; 
    // Error code for service implementation errors 
    boost::system::error_code m_ecService; 

    // Exception ptr to transport exception across different threads 
    boost::exception_ptr m_exception; 
}; 

进一步反馈/批评当然会受到欢迎。