2016-09-15 143 views
0

我使用这个类的生产者 - 消费者安装在C++:C++线程安全的队列关机

#pragma once 

#include <queue> 
#include <mutex> 
#include <condition_variable> 
#include <memory> 
#include <atomic> 

template <typename T> class SafeQueue 
{ 
public: 
    SafeQueue() : 
    _shutdown(false) 
    { 

    } 

    void Enqueue(T item) 
    { 
     std::unique_lock<std::mutex> lock(_queue_mutex); 
     bool was_empty = _queue.empty(); 
     _queue.push(std::move(item)); 
     lock.unlock(); 

     if (was_empty) 
      _condition_variable.notify_one(); 
    } 

    bool Dequeue(T& item) 
    { 
     std::unique_lock<std::mutex> lock(_queue_mutex); 

     while (!_shutdown && _queue.empty()) 
      _condition_variable.wait(lock); 

     if(!_shutdown) 
     { 
      item = std::move(_queue.front()); 
      _queue.pop(); 

      return true; 
     } 

     return false; 
    } 

    bool IsEmpty() 
    { 
     std::lock_guard<std::mutex> lock(_queue_mutex); 
     return _queue.empty(); 
    } 

    void Shutdown() 
    { 
     _shutdown = true; 
     _condition_variable.notify_all(); 
    } 

private: 
    std::mutex _queue_mutex; 
    std::condition_variable _condition_variable; 
    std::queue<T> _queue; 
    std::atomic<bool> _shutdown; 
}; 

我用它是这样的:

class Producer 
{ 
public: 
    Producer() : 
     _running(true), 
     _t(std::bind(&Producer::ProduceThread, this)) 
    { } 

    ~Producer() 
    { 
     _running = false; 
     _incoming_packets.Shutdown(); 
     _t.join(); 
    } 

    SafeQueue<Packet> _incoming_packets; 

private: 
    void ProduceThread() 
    { 
     while(_running) 
     { 
      Packet p = GetNewPacket(); 
      _incoming_packets.Enqueue(p); 
     } 
    } 

    std::atomic<bool> _running; 
    std::thread _t; 
} 

class Consumer 
{ 
    Consumer(Producer* producer) : 
     _producer(producer), 
     _t(std::bind(&Consumer::WorkerThread, this)) 
    { } 

    ~Consumer() 
    { 
     _t.join(); 
    } 

private: 
    void WorkerThread() 
    { 
     Packet p; 

     while(producer->_incoming_packets.Dequeue(p)) 
      ProcessPacket(p); 
    } 

    std::thread _t; 
    Producer* _producer; 
} 

这工作的时间。但在同时,一旦当我删除生产者(并引起它的析构函数调用SafeQueue::Shutdown,在_t.join()块永远

我的猜测是,这个问题是这里(SafeQueue::Dequeue):

while (!_shutdown && _queue.empty()) 
     _condition_variable.wait(lock); 

SafeQueue::Shutdown从线#1被调用,而线#2检查完_Shutdown但在此之前它执行_condition_variable.wait(lock),因此“缺失”的notify_all()。能这样呢?

如果这是问题,解决问题的最佳方法是什么?

+0

覆盖您是否打开了最高级别的警告?您的消费者类中有一个微妙的错误。您的数据成员的顺序以及您打算在构造函数冲突中初始化它们的顺序....检查它。该线程将在分配_producer之前创建。 – WhiZTiM

+0

同样,您错误地使用了条件变量......它不应该像这样处于while循环中。它有一个允许这样的测试的过载 – WhiZTiM

+0

@WhiZTiM我知道它有这样的过载,但它相当于一个while循环:http://en.cppreference.com/w/cpp/thread/condition_variable/wait。 – UnTraDe

回答

1

由于SafeQueue对象为生产者所有,因此当生产者完成时,删除生产者会导致通知的消费者和被删除的SafeQueue之间的竞争条件被删除。

我建议共享资​​源既不是由生产者也不是由消费者拥有,而是作为引用传递给每个构造器。

更改生产者和消费者构造函数;

Producer(SafeQueue<Packet> & queue) : 
    _running(false), _incoming_packets(queue) {} 


Consumer(SafeQueue<Packet> & queue) : 
    _running(false), _incoming_packets(queue) {} 

以这种方式使用您的实例;

SafeQueue<Packet> queue; 
Producer producer(queue); 
Consumer consumer(queue); 

...do stuff... 

queue.shutdown(); 

这也解决了您在Consumer类中与Producer类紧密耦合的糟糕设计问题。

另外,在析构函数中杀死和连接线程可能是个坏主意,就像你为〜Producer做的那样。最好为每个线程类添加一个Shutdown()方法,并明确地调用它们;

producer.shutdown(); 
consumer.shutdown(); 
queue.shutdown(); 

关机顺序并不重要,除非你是担心失去仍在队列中,当你停止消费未处理的分组。

+0

为什么在解构器中加入线程是一个糟糕的设计?它与RAII不冲突吗? – UnTraDe

+0

@UnTraDe,创建和销毁将是RAII,启动和停止是程序控制。但是,join可以抛出异常,这意味着正确的处理需要更多的析构函数中的逻辑,这绝对与RAII冲突。 – CAB

0

在您的SafeQueue::Dequeue中,您可能正在使用std::condition_variable错误的方式...更改此:

bool Dequeue(T& item) 
{ 
    std::unique_lock<std::mutex> lock(_queue_mutex); 

    while (!_shutdown && _queue.empty()) 
     _condition_variable.wait(lock); 
    if(!_shutdown) 
    { 
     item = std::move(_queue.front()); 
     _queue.pop(); 
     return true; 
    } 
    return false; 
} 

bool Dequeue(T& item) 
{ 
    std::unique_lock<std::mutex> lock(_queue_mutex); 
    _condition_variable.wait(lock, []{ return _shutdown || !_queue.empty() }); 
    if(!_shutdown) 
    { 
     item = std::move(_queue.front()); 
     _queue.pop(); 
     return true; 
    } 

    return false; 
} 

其次,Consumer数据成员的初始化顺序是不正确的有关于它的构造

class Consumer 
{ 
    Consumer(Producer* producer) : 
     _producer(producer), 
     _t(std::bind(&Consumer::WorkerThread, this)) 
    { } 
    ...... 
    // _t will be constructed first, regardless of your constructor initializer list 
    // Meaning, the thread can even start running using an unintialized _producer 
    std::thread _t;  
    Producer* _producer; 
} 

应该重新排序到:

class Consumer 
{ 
    Consumer(Producer* producer) : 
     _producer(producer), 
     _t(std::bind(&Consumer::WorkerThread, this)) 
    { } 
    ...... 
    Producer* _producer; 
    std::thread _t;  
} 

您的问题的另一部分由CAB's answer

+0

原始'while(!pred)condvar.wait(lock);'代码是正确的,你的改变是错误的,因为当传递一个谓词时,等待谓词为真时停止,所以它应该是'_shutdown || !_queue.empty()' – stefaanv

+0

@stefaanv,Oopsie。纠正。非常感谢 – WhiZTiM