我使用这个类的生产者 - 消费者安装在C++:C++线程安全的队列关机
#pragma once
#include <queue>
#include <mutex>
#include <condition_variable>
#include <memory>
#include <atomic>
template <typename T> class SafeQueue
{
public:
SafeQueue() :
_shutdown(false)
{
}
void Enqueue(T item)
{
std::unique_lock<std::mutex> lock(_queue_mutex);
bool was_empty = _queue.empty();
_queue.push(std::move(item));
lock.unlock();
if (was_empty)
_condition_variable.notify_one();
}
bool Dequeue(T& item)
{
std::unique_lock<std::mutex> lock(_queue_mutex);
while (!_shutdown && _queue.empty())
_condition_variable.wait(lock);
if(!_shutdown)
{
item = std::move(_queue.front());
_queue.pop();
return true;
}
return false;
}
bool IsEmpty()
{
std::lock_guard<std::mutex> lock(_queue_mutex);
return _queue.empty();
}
void Shutdown()
{
_shutdown = true;
_condition_variable.notify_all();
}
private:
std::mutex _queue_mutex;
std::condition_variable _condition_variable;
std::queue<T> _queue;
std::atomic<bool> _shutdown;
};
我用它是这样的:
class Producer
{
public:
Producer() :
_running(true),
_t(std::bind(&Producer::ProduceThread, this))
{ }
~Producer()
{
_running = false;
_incoming_packets.Shutdown();
_t.join();
}
SafeQueue<Packet> _incoming_packets;
private:
void ProduceThread()
{
while(_running)
{
Packet p = GetNewPacket();
_incoming_packets.Enqueue(p);
}
}
std::atomic<bool> _running;
std::thread _t;
}
class Consumer
{
Consumer(Producer* producer) :
_producer(producer),
_t(std::bind(&Consumer::WorkerThread, this))
{ }
~Consumer()
{
_t.join();
}
private:
void WorkerThread()
{
Packet p;
while(producer->_incoming_packets.Dequeue(p))
ProcessPacket(p);
}
std::thread _t;
Producer* _producer;
}
这工作最的时间。但在同时,一旦当我删除生产者(并引起它的析构函数调用SafeQueue::Shutdown
,在_t.join()
块永远
我的猜测是,这个问题是这里(SafeQueue::Dequeue
):
while (!_shutdown && _queue.empty())
_condition_variable.wait(lock);
SafeQueue::Shutdown
从线#1被调用,而线#2检查完_Shutdown但在此之前它执行_condition_variable.wait(lock)
,因此“缺失”的notify_all()
。能这样呢?
如果这是问题,解决问题的最佳方法是什么?
覆盖您是否打开了最高级别的警告?您的消费者类中有一个微妙的错误。您的数据成员的顺序以及您打算在构造函数冲突中初始化它们的顺序....检查它。该线程将在分配_producer之前创建。 – WhiZTiM
同样,您错误地使用了条件变量......它不应该像这样处于while循环中。它有一个允许这样的测试的过载 – WhiZTiM
@WhiZTiM我知道它有这样的过载,但它相当于一个while循环:http://en.cppreference.com/w/cpp/thread/condition_variable/wait。 – UnTraDe