2012-02-22 182 views
2

这是一个经典的c/p问题,其中一些线程在其他线程读取数据时产生数据。生产者和消费者都共享一个常量大小的缓冲区。如果缓冲区为空,则消费者必须等待,如果缓冲区已满,则制片人必须等待。我正在使用信号量来跟踪全部或空的队列。制片人将减少免费斑点信号量,增加值,并增加填充槽信号量。所以我试图实现一个从生成器函数获取一些数字的程序,然后打印出数字的平均值。通过将此视为生产者 - 消费者问题,我试图节省执行该程序的一些时间。 generateNumber函数在进程中导致一些延迟,所以我想创建一些生成数字的线程,并将它们放入队列中。然后,运行主函数的“主线程”必须从队列中读取并求和,然后求平均值。所以这是我到目前为止有:C++中的消费者/生产者

#include <cstdio> 
#include <cstdlib> 
#include <time.h> 
#include "Thread.h" 
#include <queue> 

int generateNumber() { 
    int delayms = rand()/(float) RAND_MAX * 400.f + 200; 
    int result = rand()/(float) RAND_MAX * 20; 
    struct timespec ts; 
    ts.tv_sec = 0; 
    ts.tv_nsec = delayms * 1000000; 
    nanosleep(&ts, NULL); 
    return result; } 


struct threadarg { 
    Semaphore filled(0); 
    Semaphore empty(n); 
    std::queue<int> q; }; 


void* threadfunc(void *arg) { 
    threadarg *targp = (threadarg *) arg; 
    threadarg &targ = *targp; 
    while (targ.empty.value() != 0) { 
     int val = generateNumber(); 
     targ.empty.dec(); 
     q.push_back(val); 
     targ.filled.inc(); } 
} 
int main(int argc, char **argv) { 
    Thread consumer, producer; 
    // read the command line arguments 
    if (argc != 2) { 
     printf("usage: %s [nums to average]\n", argv[0]); 
     exit(1); } 
    int n = atoi(argv[1]); 
    // Seed random number generator 
    srand(time(NULL)); 
} 

我现在有点困惑,因为我不知道如何创建的生成数量多生产线(如果q是不充分),而消费者从阅读队列(即如果q不为空)。我不确定要把主要内容放在什么位置。 也在“Thread.h”中,您可以创建一个线程,一个互斥量或一个信号量。该线程具有.run(threadFunc,arg),.join()等方法。可以锁定或解锁互斥锁。信号量方法已经全部用于我的代码中。

+1

嗨丹,你没有接受任何给你的答案。请给社区一些答案以回答你的问题。 – 2012-02-22 14:03:32

+0

我很抱歉,我甚至没有意识到这是一个选项,直到现在!我接受了我以前提出的所有问题的答案。 – 2012-02-22 14:09:31

+0

非常感谢您的回复。然而,它并不是我所面临的代码,我只是不确定在哪里定义什么特别是与消费者。 – 2012-02-22 20:50:36

回答

3

您的队列没有同步,因此多个生产者可以同时拨打push_back,或者同时拨打pop_front ......这会中断。

简单的方法,以使这项工作是使用一个线程安全的队列,这可能是围绕std::queue你已经有了,再加上一个互斥体的包装。

您可以通过添加一个互斥开始,锁定/解锁它周围的每个正打电话给你std::queue - 单个消费者认为应该是足够的,对多个消费者你需要融合front()pop_front()成一个单一的同步呼叫。

要让消费者在队列为空时阻塞,可以向包装器添加条件变量。

这应该足以让您在线找到答案 - 下面的示例代码。


template <typename T> class SynchronizedQueue 
{ 
    std::queue<T> queue_; 
    std::mutex mutex_; 
    std::condition_variable condvar_; 

    typedef std::lock_guard<std::mutex> lock; 
    typedef std::unique_lock<std::mutex> ulock; 

public: 
    void push(T const &val) 
    { 
     lock l(mutex_); // prevents multiple pushes corrupting queue_ 
     bool wake = queue_.empty(); // we may need to wake consumer 
     queue_.push(val); 
     if (wake) condvar_.notify_one(); 
    } 

    T pop() 
    { 
     ulock u(mutex_); 
     while (queue_.empty()) 
      condvar_.wait(u); 
     // now queue_ is non-empty and we still have the lock 
     T retval = queue_.front(); 
     queue_.pop(); 
     return retval; 
    } 
}; 

更换std::mutex等人与任何原语的 “Thread.h” 给你。

+0

请问你能提供一些样品代码吗? – 2012-02-22 14:24:50

+0

你是什么意思的消费区块倒数第二行? – 2012-02-22 14:26:47

+1

假设您的消费者将调用'pop()'获取下一个结果:如果队列为空,则应该阻塞,直到生产者添加一些东西,然后返回它;示例代码来了。 – Useless 2012-02-22 14:31:27

1

我会做的是这样的:

  • 让隐藏您的队列
  • 了一块数据保存到Q,和删除数据块的创建线程安全的存取方法数据类从q(我将使用单个互斥体或访问器的关键部分)
  • 处理情况下,一个consumor没有任何数据可用(睡眠)
  • 处理q正在成为太满了,生产者需要放慢速度
  • 让线程去无可奈何地添加和删除,因为他们生产/消费

还有,记得睡眠加入到每一个线程,否则你会挂在CPU,而不是给线程调度切换上下文并与其他线程/进程共享CPU的好地方。你不需要,但这是一个好习惯。

+0

(Upvote让我超过5k!)= D – Kieveli 2012-02-22 14:25:12

+0

我会投票,但你的建议是不澄清这个问题。 – 2012-02-22 14:28:00

+0

实际上,像这样改变架构固有地解决了这个问题。 – Kieveli 2012-02-22 14:30:36

0

在管理这样的共享状态,你需要一个条件变量和 互斥。其基本模式是线沿线的一个功能:

ScopedLock l(theMutex); 
while (!conditionMet) { 
    theCondition.wait(theMutex); 
} 
doWhatever(); 
theCondition.notify(); 

在你的情况,我可能会做的条件变量和互斥类的 成员执行队列中。写的 conditionMet!queue.full(),所以你最终的东西 这样的:

ScopedLock l(queue.myMutex); 
while (queue.full()) { 
    queue.myCondition.wait(); 
} 
queue.insert(whatever); 
queue.myCondition.notify(); 

和阅读:

ScopedLock l(queue.myMutex); 
while (queue.empty()) { 
    queue.myCondition.wait(); 
} 
results = queue.extract(); 
queue.myCondition.notify(); 
return results; 

根据不同的线程接口上,可能有两个notify 功能:通知其中一个(唤醒单线程),并通知所有 (唤醒所有等待的线程);在这种情况下,你需要 通知所有(或者你需要两个条件变量,一个是空间 写,还有一个东西可以读,每个功能等待一个, 但通知其他)。

0

使用互斥锁保护队列访问,应该是这样。一个'计算机科学101'有界的生产者 - 消费者队列需要两个信号量(管理空闲/空的计数,生产者/消费者等待,就像你已经做的那样),一个mutex/futex/criticalSection保护队列。

我不明白如何用condvars替换信号量和互斥量是非常有帮助的。重点是什么?你如何实施一个有限制的生产者 - 消费者队列,并且在拥有多个生产者/消费者的所有平台上工作的condvars?