2013-03-10 117 views
2

我有一个快速生产者插入队列中的数据和慢速消费者消费数据的用例。我面临的问题是随着时间的推移队列大小不断增加。我有一个类实现,其中一个std ::队列被互斥锁和条件变量保护,用于并发读写。快速生产者和缓慢消费者

这怎么适应这种情况,生产者在达到MAX_THRESHOLD之后,直到停止插入数据到队列中,并且消费者已经消耗了生产者将数据插入队列的一些数据信号。

有人可以提供一个示例实现吗?

此外,在不改变类实现的情况下,是否可以通过在生产者和消费者中添加另一层同步来解决此问题?

+0

显示你的代码。 – 2013-03-10 20:55:32

+0

另请参阅http://programmers.stackexchange.com/q/244826/24257 – Pacerier 2015-09-14 11:33:14

回答

1

代码片段:

#include <queue> 
#include <pthread.h> 

template <class T, size_t UpperLimit> 
class BoundedQueue { 
    std::queue<T> q_; 
    pthread_mutex_t mtx_; 
    pthread_cond_t cv_not_empry_; 
    pthread_cond_t cv_not_full_; 

    // lock/unlock helper 
    struct auto_locker { 
    auto_locker(pthread_mutex_t* pm) : pm_(pm) 
     { pthread_mutex_lock(pm_); } 
    ~auto_locker() 
     { pthread_mutex_unlock(pm_);} 
    pthread_mutex_t *pm_; 
    }; 

public: 
    BoundedQueue() { /* initialize member... */ } 
    ~BoundedQueue() { /* uninitialize member...*/ } 
    // for Producer 
    void push(T x) { 
    auto_locker lk(&mtx_); 
    while (UpperLimit <= q_.size()) { 
     pthread_cond_wait(&cv_not_full_, &mtx_); 
    } 
    q_.push(x); 
    pthread_cond_broadcast(&cv_not_empry_); 
    return ret; 
    } 
    // for Consumer 
    T pop() { 
    auto_locker lk(&mtx_); 
    while (q_.empty()) { 
     pthread_cond_wait(&cv_not_empry_, &mtx_); 
    } 
    T ret = q_.front(); 
    q_.pop(); 
    pthread_cond_broadcast(&cv_not_full_); 
    return ret; 
    } 
} 
4

或者:

a)使用一个有界队列类块如果队列大小达到MAX_THRESHOLD生产者。这意味着更改您可能不需要的队列类。

b)使用“池队列” - 另一个无限制的阻塞队列,在启动时填充MAX_THRESHOLD对象。生产者从池中获取它的对象,加载它们,排队到生产者。生产者从消费者那里获取对象,“消费”它们并将它们返回到池中。这种有点使用指针或可能引用,你可能不希望。

c)使用以MAX_THRESHOLD计数初始化的信号量以类似于(b)的方式表示消息标记 - 生产者在排队之前必须获取单元,消费者在完成消息时发布单元目的。

我倾向于使用(b)。与并行线程 “界队列” 的

+0

您是否在介绍第二种方法 - “泳池队列”? – Pacerier 2015-09-14 11:28:24