2008-10-15 51 views
6

我想实现的是服从它大致接口生产者/消费者方案:如何实现阻塞读取使用POSIX线程

class Consumer { 
private: 
    vector<char> read(size_t n) { 
     // If the internal buffer has `n` elements, then dequeue them 
     // Otherwise wait for more data and try again 
    } 
public: 
    void run() { 
     read(10); 
     read(4839); 
     // etc 
    } 
    void feed(const vector<char> &more) { 
     // Safely queue the data 
     // Notify `read` that there is now more data 
    } 
}; 

在这种情况下,feedrun将在单独的线程和read运行应是一个阻止读取(如recvfread)。很显然,我需要在我的双壳牌上进行某种互斥,我需要某种通知系统来通知read再试一次。

我听到条件变量是要走的路,但我所有的多线程的经验在于与Windows和我有一个很难包装我的头周围。

感谢您的帮助!

(是的,我知道这是低效的返回向量。让我们不要进入该)

回答

8

此代码尚未准备好生产。 对任何库调用的结果都没有进行错误检查。

我已经在LockThread中封装了互斥锁的锁定/解锁,因此它是异常安全的。但就是这样。

另外,如果我认真对待这个问题,我会在对象中包含互斥锁和条件变量,以便它们可以在其他消费者方法中被滥用。但只要你注意到在使用条件变量(以任何方式)之前必须获取锁定,那么这种简单的情况可以保持原样。

出于兴趣,你检查了增强线程库吗?

#include <iostream> 
#include <vector> 
#include <pthread.h> 

class LockThread 
{ 
    public: 
    LockThread(pthread_mutex_t& m) 
     :mutex(m) 
    { 
     pthread_mutex_lock(&mutex); 
    } 
    ~LockThread() 
    { 
     pthread_mutex_unlock(&mutex); 
    } 
    private: 
     pthread_mutex_t& mutex; 
}; 
class Consumer 
{ 
    pthread_mutex_t  lock; 
    pthread_cond_t  cond; 
    std::vector<char> unreadData; 
    public: 
    Consumer() 
    { 
     pthread_mutex_init(&lock,NULL); 
     pthread_cond_init(&cond,NULL); 
    } 
    ~Consumer() 
    { 
     pthread_cond_destroy(&cond); 
     pthread_mutex_destroy(&lock); 
    } 

    private: 
     std::vector<char> read(size_t n) 
     { 
      LockThread locker(lock); 
      while (unreadData.size() < n) 
      { 
       // Must wait until we have n char. 
       // This is a while loop because feed may not put enough in. 

       // pthread_cond() releases the lock. 
       // Thread will not be allowed to continue until 
       // signal is called and this thread reacquires the lock. 

       pthread_cond_wait(&cond,&lock); 

       // Once released from the condition you will have re-aquired the lock. 
       // Thus feed() must have exited and released the lock first. 
      } 

      /* 
      * Not sure if this is exactly what you wanted. 
      * But the data is copied out of the thread safe buffer 
      * into something that can be returned. 
      */ 
      std::vector<char> result(n); // init result with size n 
      std::copy(&unreadData[0], 
         &unreadData[n], 
         &result[0]); 

      unreadData.erase(unreadData.begin(), 
          unreadData.begin() + n); 
      return (result); 
     } 
public: 
    void run() 
    { 
     read(10); 
     read(4839); 
     // etc 
    } 
    void feed(const std::vector<char> &more) 
    { 
     LockThread locker(lock); 

     // Once we acquire the lock we can safely modify the buffer. 
     std::copy(more.begin(),more.end(),std::back_inserter(unreadData)); 

     // Only signal the thread if you have the lock 
     // Otherwise race conditions happen. 
     pthread_cond_signal(&cond); 

     // destructor releases the lock and thus allows read thread to continue. 
    } 
}; 


int main() 
{ 
    Consumer c; 
} 
1

我会扔了一些半伪代码。这里是我的意见:

1)在这里非常大的晶粒锁定。如果你需要更快的访问,你会想重新考虑你的数据结构。 STL不是线程安全的。

2)锁将阻塞,直到互斥量让它通过。互斥体结构是,它可以让锁定/解锁机制一次穿过它。无需轮询或某种异常类型的结构。

3)这个问题在语法上很有效。我对API和C++语法并不精确,但我相信它提供了一个语义上正确的解决方案。

4)编辑回应评论。

class piper 
{ 
pthread_mutex queuemutex; 
pthread_mutex readymutex; 
bool isReady; //init to false by constructor 

//whatever else 
}; 

piper::read() 
{//whatever 
pthread_mutex_lock(&queuemutex) 
if(myqueue.size() >= n) 
{ 
    return_queue_vector.push_back(/* you know what to do here */) 

    pthread_mutex_lock(&readymutex) 
    isReady = false; 
    pthread_mutex_unlock(&readymutex) 
} 
pthread_mutex_unlock(&queuemutex) 
} 

piper::push_em_in() 
{ 
//more whatever 
pthread_mutex_lock(&queuemutex) 
//push push push 
if(myqueue.size() >= n) 
{ 
    pthread_mutex_lock(&readymutex) 
    isReady = true; 
    pthread_mutex_unlock(&readymutex) 
} 
pthread_mutex_unlock(&queuemutex) 
} 
+0

好的开始,但请记住我希望我的阅读成功。不能保证`push_em_in`会保存足够的数据以便发生。所以阅读需要等到足够了。这是我想确保高效(不旋转)的循环。 – 2008-10-15 23:19:09

+0

您也可以使用RAII来确保您的锁()unlock()是异常安全的。 – 2008-10-15 23:20:30

+0

@Frank对这个概念采取了另一种方式。你现在更好地学习如何更好地使用pthread mutex吗? – 2008-10-15 23:24:41

2

我倾向于使用我称之为“同步队列”的东西。我换了正常的队列,并使用Semaphore类两个锁,使读取块,就像你的愿望:

#ifndef SYNCQUEUE_20061005_H_ 
#define SYNCQUEUE_20061005_H_ 

#include <queue> 
#include "Semaphore.h" 

// similar, but slightly simpler interface to std::queue 
// this queue implementation will serialize pushes and pops 
// and block on a pop while empty (as apposed to throwing an exception) 
// it also locks as neccessary on insertion and removal to avoid race 
// conditions 

template <class T, class C = std::deque<T> > class SyncQueue { 
protected: 
    std::queue<T, C> m_Queue; 
    Semaphore   m_Semaphore; 
    Mutex    m_Mutex; 

public: 
    typedef typename std::queue<T, C>::value_type value_type; 
    typedef typename std::queue<T, C>::size_type size_type; 

    explicit SyncQueue(const C& a = C()) : m_Queue(a), m_Semaphore(0) {} 

    bool empty() const    { return m_Queue.empty(); } 
    size_type size() const   { return m_Queue.size(); } 

    void push(const value_type& x); 
    value_type pop(); 
}; 

template <class T, class C> 
void SyncQueue<T, C>::push(const SyncQueue<T, C>::value_type &x) { 
    // atomically push item 
    m_Mutex.lock(); 
    m_Queue.push(x); 
    m_Mutex.unlock(); 

    // let blocking semaphore know another item has arrived 
    m_Semaphore.v(); 
} 

template <class T, class C> 
typename SyncQueue<T, C>::value_type SyncQueue<T, C>::pop() { 
    // block until we have at least one item 
    m_Semaphore.p(); 

    // atomically read and pop front item 
    m_Mutex.lock(); 
    value_type ret = m_Queue.front(); 
    m_Queue.pop(); 
    m_Mutex.unlock(); 

    return ret; 
} 

#endif 

您可以实现信号量和互斥,在你的线程执行相应的原语。

注意:这个实现是一个队列中单个元素的例子,但是你可以很容易地用一个缓存结果的函数来包装它,直到N被提供。像这样的东西,如果它是一个字符队列:

std::vector<char> func(int size) { 
    std::vector<char> result; 
    while(result.size() != size) { 
     result.push_back(my_sync_queue.pop()); 
    } 
    return result; 
} 
1

只是为了好玩,这里是一个快速和肮脏的实施使用Boost。它在支持它的平台上使用pthreads,而在windows上使用windows操作。

boost::mutex access; 
boost::condition cond; 

// consumer 
data read() 
{ 
    boost::mutex::scoped_lock lock(access); 
    // this blocks until the data is ready 
    cond.wait(lock); 

    // queue is ready 
    return data_from_queue(); 
} 

// producer 
void push(data) 
{ 
    boost::mutex::scoped_lock lock(access); 
    // add data to queue 

    if (queue_has_enough_data()) 
    cond.notify_one(); 
} 
1

更好玩,这里是我的最终版本。 STL没有很好的理由。 :-)

#include <algorithm> 
#include <deque> 
#include <pthread.h> 

template<typename T> 
class MultithreadedReader { 
    std::deque<T> buffer; 
    pthread_mutex_t moreDataMutex; 
    pthread_cond_t moreDataCond; 

protected: 
    template<typename OutputIterator> 
    void read(size_t count, OutputIterator result) { 
     pthread_mutex_lock(&moreDataMutex); 

     while (buffer.size() < count) { 
      pthread_cond_wait(&moreDataCond, &moreDataMutex); 
     } 
     std::copy(buffer.begin(), buffer.begin() + count, result); 
     buffer.erase(buffer.begin(), buffer.begin() + count); 

     pthread_mutex_unlock(&moreDataMutex); 
    } 

public: 
    MultithreadedReader() { 
     pthread_mutex_init(&moreDataMutex, 0); 
     pthread_cond_init(&moreDataCond, 0); 
    } 

    ~MultithreadedReader() { 
     pthread_cond_destroy(&moreDataCond); 
     pthread_mutex_destroy(&moreDataMutex); 
    } 

    template<typename InputIterator> 
    void feed(InputIterator first, InputIterator last) { 
     pthread_mutex_lock(&moreDataMutex); 

     buffer.insert(buffer.end(), first, last); 
     pthread_cond_signal(&moreDataCond); 

     pthread_mutex_unlock(&moreDataMutex); 
    } 
};