我想使用boost线程库实现一个带有条件变量的同步队列,就像这里的示例 - >(ImplementingThreadSafeQueue)。boost在C++条件变量中的同步队列不会通知其他线程上的等待类方法
背景/目的:我正在撰写windows服务作为高级设计项目的一部分。在整个服务中,我希望有各种级别的日志记录(包括文件和Windows事件查看器),还使用我自己的“CreateTimerQueueTimer”函数的“EventTimer”包装来创建定时事件,例如服务报告心跳。我的想法是将消息对象推送到同步队列中,并让记录类在其自己的线程上观察队列,等待执行各种日志记录任务。为了简单起见,我现在只用字符串进行测试。
问题:记录器线程在属于记录类的方法上运行,以从队列中获取工作项。如果我从类外部将东西推到队列上,可以从EventTimer线程或甚至主线程中说出,记录器永远不会收到队列中新项目的通知。但是,如果我创建属于记录器类的两个线程并使用其中一个线程将某些内容推送到队列中,则记录器将看到它并作出响应。我希望有任何线程能够添加东西到队列中,并通知记录器新项目。
我的代码如下。任何帮助,将不胜感激。感谢您的时间!
同步队列代码
#ifndef _SYNCHRONIZED_QUEUE_
#define _SYNCHRONIZED_QUEUE_
// Include Files
#include <boost\noncopyable.hpp>
#include <boost\thread.hpp>
#include <queue>
namespace GSMV
{
///////////////////////////////////////////////////////////////////////////////////////
/// Class: SynchronizedQueue
///
/// @Brief
/// SynchronizedQueue is a thread safe STL Queue wrapper that waits on Dequeue and
/// notifies a listening thread on Enqueue. It is not copyable.
///////////////////////////////////////////////////////////////////////////////////////
template <typename T>
class SynchronizedQueue : private boost::noncopyable
{
public:
struct Canceled{};
///////////////////////////////////////////////////////////////////////////////////////
/// Function: Constructor
///
/// @Brief
/// Default constructor for the SynchronizedQueue object.
///////////////////////////////////////////////////////////////////////////////////////
SynchronizedQueue(void)
{
// Queue is not canceled to start with
this->mCanceled = false;
// Nobody waiting yet
this->mWaiting = 0;
}
///////////////////////////////////////////////////////////////////////////////////////
/// Function: Enqueue
///
/// @Param const T &item: Item of type T to add to queue.
///
/// @Brief
/// Adds an item of type T to the queue notifying via a condition.
///////////////////////////////////////////////////////////////////////////////////////
void Enqueue(const T &item)
{
bool enqueued = false;
// acquire lock on the queue
boost::unique_lock<boost::mutex> lock(this->mMutex);
// make sure the queue is not canceled
if (this->mCanceled)
throw Canceled();
// add item to the queue
this->mQueue.push(item);
// notify others that queue has a new item
this->mItemAvailable.notify_one();
}
///////////////////////////////////////////////////////////////////////////////////////
/// Function: Dequeue
///
/// @Return
/// Item of type T from front of queue.
///
/// @Brief
/// Returns an item of type T from the queue and deletes the front of the queue. Thread
/// will wait on an empty queue until it is signaled via Enqueue.
///////////////////////////////////////////////////////////////////////////////////////
T Dequeue(void)
{
// acquire lock on the queue
boost::unique_lock<boost::mutex> lock(this->mMutex);
// make sure the queue is not canceled
if (this->mCanceled)
throw Canceled();
// one more thread is waiting on this item
++this->mWaiting;
// if the queue is empty, wait until an item is added
// lock is released inside the wait
// lock is re-acquired after the wait
while (this->mQueue.empty())
this->mItemAvailable.wait(lock);
// the thread is done waiting now
--this->mWaiting;
// retrieve and remove the item from the queue
T item = this->mQueue.front();
this->mQueue.pop();
return item;
// lock is released
}
///////////////////////////////////////////////////////////////////////////////////////
/// Function: GetSize
///
/// @Return
/// The current size of the queue (number of items in the queue).
///
/// @Brief
/// Returns the number of items contained in the queue.
///////////////////////////////////////////////////////////////////////////////////////
int GetSize(void)
{
// acquire lock on the queue
boost::unique_lock<boost::mutex> lock(this->mMutex);
// make sure the queue is not canceled
if (this->mCanceled)
throw Canceled();
return this->mQueue.size();
// lock is released
}
///////////////////////////////////////////////////////////////////////////////////////
/// Function: IsEmpty
///
/// @Return
/// Boolean queue is empty.
///
/// @Brief
/// Returns true if queue is empty false otherwise.
///////////////////////////////////////////////////////////////////////////////////////
bool IsEmpty(void)
{
// acquire lock on the queue
boost::unique_lock<boost::mutex> lock(this->mMutex);
// make sure the queue is not canceled
if (this->mCanceled)
throw Canceled();
return this->mQueue.empty();
// lock is released
}
void Cancel(void)
{
// acquire lock on the queue
boost::unique_lock<boost::mutex> lock(this->mMutex);
// make sure the queue is not canceled
if (this->mCanceled)
throw Canceled();
this->mCanceled = true;
// notify all others that queue has a new item
this->mItemAvailable.notify_all();
while (0 < this->mWaiting)
this->mItemAvailable.wait(lock);
}
void Reset(void)
{
// acquire lock on the queue
boost::unique_lock<boost::mutex> lock(this->mMutex);
// reset the canceled arguement
this->mCanceled = false;
}
private:
bool mCanceled;
int mWaiting;
std::queue<T> mQueue; // the STL Queue
boost::mutex mMutex; // the mutex object
boost::condition_variable mItemAvailable; // the signal condition
};
} // Namespace GSMV
#endif /// _SYNCHRONIZED_QUEUE_
记录器代码
#ifndef _LOGGER_H_
#define _LOGGER_H_
#include "SynchronizedQueue.h"
#include <string>
#include <boost\thread.hpp>
namespace GSMV
{
static SynchronizedQueue<std::string> logQ;
class Logger
{
public:
Logger(void);
~Logger(void);
bool Start(void);
bool Stop(void);
bool IsRunning(void) const;
void LoggerWorkThread(void);
private:
boost::thread* mpLoggerThread;
};
} // Namespace GSMV
#endif
// FILE END - logger.h //
#include "Logger.h"
using namespace GSMV;
Logger::Logger(void)
{
this->mpLoggerThread = NULL;
}
Logger::~Logger(void)
{
this->Stop();
}
bool Logger::Start(void)
{
bool started = this->IsRunning();
if (!started)
{
this->mpLoggerThread = new boost::thread(&Logger::LoggerWorkThread, this);
started = (NULL != this->mpLoggerThread);
}
return started;
}
bool Logger::Stop(void)
{
bool stopped = !this->IsRunning();
if (!stopped)
{
this->mpLoggerThread->interrupt();
this->mpLoggerThread->join();
delete this->mpLoggerThread;
this->mpLoggerThread = NULL;
stopped = true;
}
return stopped;
}
bool Logger::IsRunning(void) const
{
return (NULL != this->mpLoggerThread);
}
void Logger::LoggerWorkThread(void)
{
std::cout << "Enter Logger Work Thread\n" << std::endl;
while (this->IsRunning())
{
std::cout << "LOG: wait for Q..." << std::endl;
std::string s = logQ.Dequeue();
std::cout << "LOG: Got item! => " << s << std::endl;
boost::this_thread::interruption_point();
}
std::cout << "Exit Logger Work Thread\n" << std::endl;
}
因此,使用上面的代码我将创建一个记录器对象,并调用Start()方法。理想情况下,它会启动一个循环的新线程,检查队列中的字符串项,直到调用Stop()方法。所以回到我的主函数中,我可以将字符串推入队列,记录器应该得到它们,但记录器永远不会收到通知。如果这很重要,队列在Logger头文件中声明为“static SynchronizedQueue logQ”。 再次,我希望这里有任何建议。谢谢!
嗯,你是对的!我用“lock.unlock()”解锁了互斥锁。然而,我仍然遇到同样的行为...... – akagixxer 2012-01-30 19:02:36
你应该重新设计你的队列:第一眼,我想知道在线程退出时或错误情况下应该返回什么'Dequeue()? std :: queue's'pop()'不返回值是有很好的理由的!那么:重新设计后,你的错误可能会变得清晰。 – Frunsi 2012-01-31 22:44:09
那么..你应该重新读你的示例源队列实现(http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html) - 它包含答案。另外:你添加一个'mCanceled'标志似乎并不适合队列本身。 – Frunsi 2012-01-31 22:50:41