2012-03-30 97 views
2

我有一些代码,我修改了(几乎没有改变它)从C++并发在行动书,所以会期望它的工作 - 只有它不。 我一直试图做的是实现一个线程安全的队列,我可以再为 线程或线程商店后台作业队列是这样的:使用condition_variable与unique_lock导致定期崩溃(GCC 4.7,OSX)

queue.h

#pragma once 
#include "imgproc/i_queue.h" 
#include <memory> 
#include <thread> 
#include <queue> 
#include <mutex> 
#include <condition_variable> 

using namespace std; 

namespace imgproc { 

    /* Blocking, concurrency-safe queue. The method that blocks is pop(), 
    * which makes the current thread wait until there is a value to pop 
    * from the queue. 
    */ 
    template <typename T> 
    struct ConcurrentQueue : public IQueueWriter<T>, public IQueueReader<T> 
    { 
     ConcurrentQueue() {} 
     ConcurrentQueue(const ConcurrentQueue &) = delete; 
     ConcurrentQueue & operator= (const ConcurrentQueue &) = delete; 

     /* Concurrency-safe push to queue. 
     */ 
     virtual void push(shared_ptr<T> val) 
     { 
     lock_guard<mutex> lk(_mutex); 
     _queue.push(val); 
     _cvar.notify_one(); 
     } 

     /* Concurrency-safe check if queue empty. 
     */ 
     virtual const bool empty() const 
     { 
     lock_guard<mutex> lk(_mutex); 
     bool result(_queue.empty()); 
     return result; 
     } 

     /* Waiting, concurrency-safe pop of value. If there are no values in 
     * the queue, then this method blocks the current thread until there 
     * are. 
     */ 
     virtual shared_ptr<T> pop() 
     { 
     unique_lock<mutex> lk(_mutex); 
     _cvar.wait(lk, [ this ] {return ! _queue.empty(); }); 
     auto value(_queue.front()); 
     _queue.pop(); 
     return value; 
     } 

    private: 
     mutable mutex _mutex; 
     queue<shared_ptr<T>> _queue; 
     condition_variable _cvar; 
    }; 

} 

我的理解是,那里的互斥体应该保护所有访问队列的企图。但是,我有一个测试崩溃在10约1时间:

测试是 - 崩溃 - fragment.cpp

// Should have threads wait until there is a value to pop 
TEST_F(ConcurrentQueueTest, 
     ShouldHaveThreadsWaitUntilThereIsAValueToPop) { 
    int val(-1); 
    thread t1([ this, &val ] { 
     for (uint i(0) ; i < 1000 ; ++i); 
     val = *_r_queue->pop(); 
    }); 
    for (uint i(0) ; i < 1000 ; ++ i) { 
    for (uint j(0) ; j < 1000 ; ++ j); 
    EXPECT_EQ(-1, val); 
    } 
    _w_queue->push(make_shared<int>(27)); 
    t1.join(); 
    EXPECT_EQ(27, val); 
    EXPECT_TRUE(_r_queue->empty()); 
} 

变量_r_queue_w_queue只是接口在同一ConcurrentQueue例如,在这里。

从痴迷于调试信息的小时数开始,看起来pop()的调用是导致崩溃的原因,当_queue member实例变量为空时,总是(我见过)。 任何人都可以给我任何意见,我做错了什么,在这里?我已经看到其他职位寻求类似问题的帮助,但他们似乎总是说条件变量是答案 - 我正在尝试!

或者,也许一些建议,我可以更好地调试,以帮助我解决它? FWIW,我试着手动执行一个while,其中有一个sleep(1),它仍然会定期崩溃,这表明尽管我尽了最大的努力,我仍然得到了竞争条件 - 只有我真的看不到它。

非常感谢任何&所有的帮助,我保证我已经试图解决这个问题之前打扰你所有。

干杯, Doug。

+0

嗨 - 有关更多信息,我已将违规代码提取到单个主文件中。该程序在编译时应该(我认为)只是退出,但它实际上挂在t.join()的最后一个调用上。主文件在这里:https://gist.github.com/2396866 – biot023 2012-04-16 07:09:47

回答

1

通过阅读https://gist.github.com/2396866我确定这个问题是测试//应该能够同时弹出值”。两个线程创建,然后分离,两者无限期地pop'ing队列,即使在测试结束这会影响最后的测试(其中的问题似乎是)

对于一个快速的解决办法是:。

/* ... */ 

{ // Should be able to concurrently pop values 
    for (uint i(0) ; i < 100 ; ++ i) 
    q.push(make_shared<string>("Monty Halfwit")); 

    pair<uint, uint> counts(0, 0); 
    thread t1([ & ] { 
    while (++counts.first != 50) { 
     this_thread::sleep_for(chrono::milliseconds(1)); 
     q.pop(); 
    } 
    }); 

    thread t2([ & ] { 
    while (++counts.second != 50) { 
     this_thread::sleep_for(chrono::milliseconds(1)); 
     q.pop(); 
    } 
    }); 

    t1.detach(); 
    t2.detach(); 

/* ... */ 

这将使当他们弹出每50串线程死亡。