2016-12-05 51 views
0

我目前有一个生产者 - 消费者设置的两个线程,它使用pthread_cond_wait()pthread_cond_signal()来交替读取数据和处理数据。如何将第二个消费者添加到基于pthread的生产者 - 消费者设置?

说我有一个锁,两个条件,并且指出如果数据缓冲器中有数据的布尔标志:

pthread_mutex_t lock; 
pthread_cond_t we_have_data; 
pthread_cond_t we_need_data; 
bool buffer_is_empty = true; 

我有一个使用以下函数来产生数据的pthread_t(读数据成缓冲液):

static void* produce(void* arg) { 
    pthread_mutex_lock(&lock); 
    for (;;) { 
     while (!buffer_is_empty) { 
      pthread_cond_wait(&we_need_data, &lock); 
     } 
     pthread_mutex_unlock(&lock); 
     // read some data into our buffer 
     pthread_mutex_lock(&lock); 
     buffer_is_empty = false; 
     pthread_cond_signal(&we_have_data); 
    } 
} 

然后我有一个使用以下代码来消耗该数据,在接收到we_have_data信号的第二pthread_t

static void* consume(void* arg) { 
    pthread_mutex_lock(&lock); 
    for (;;) { 
     while (buffer_is_empty) { 
      pthread_cond_wait(&we_have_data, &lock); 
     } 
     pthread_mutex_unlock(&lock); 
     // process the data in our buffer 
     pthread_mutex_lock(&lock); 
     buffer_is_empty = true; 
     pthread_cond_signal(&we_need_data); 
    } 
} 

这工作正常。

我现在想要做的是添加第三个线程,如果缓冲区中包含某些数据,它将对consume()函数的数据起作用。

我曾尝试添加第三个条件,但我的程序挂起。

我设置了一个条件和布尔标志:

bool processing_with_second_consumer; 
pthread_cond_t we_need_to_process_data_with_another_consumer; 

然后我修改消费者:

static void* consume(void* arg) { 
    pthread_mutex_lock(&lock); 
    for (;;) { 
     while (buffer_is_empty && !processing_with_second_consumer) { 
      pthread_cond_wait(&we_have_data, &lock); 
     } 
     pthread_mutex_unlock(&lock); 
     // process the data in our buffer 
     pthread_mutex_lock(&lock); 
     if (data_meets_our_conditions) { 
      processing_with_second_consumer = true; 
      pthread_cond_signal(&we_need_to_process_data_with_another_consumer); 
     } 
     buffer_is_empty = true; 
     pthread_cond_signal(&we_need_data); 
    } 
} 

然后我修改了生产者等待布尔:

static void* produce(void* arg) { 
    pthread_mutex_lock(&lock); 
    for (;;) { 
     while (!buffer_is_empty && !processing_with_second_consumer) { 
      pthread_cond_wait(&we_need_data, &lock); 
     } 
     pthread_mutex_unlock(&lock); 
     // read some data into our buffer 
     pthread_mutex_lock(&lock); 
     buffer_is_empty = false; 
     pthread_cond_signal(&we_have_data); 
    } 
} 

并添加第三个线程从消费者消费:

static void* consume_from_the_consumer(void* arg) { 
    pthread_mutex_lock(&lock); 
    for (;;) { 
     while (!buffer_is_empty && processing_with_second_consumer) { 
      pthread_cond_wait(&we_need_to_process_data_with_another_consumer, &lock); 
     } 
     pthread_mutex_unlock(&lock); 
     // do more specific processing of the data in our buffer 
     pthread_mutex_lock(&lock); 
     processing_with_second_consumer = false; 
    } 
} 

我似乎无法让程序正确退出 - 它基本上挂在消费者消费的无限循环中。

为了允许第三个(或第四个或第五个等)线程,如何正确设置pthread条件的信号?

回答

0

为了解决这个问题的三个线程,我需要做一些修改:

  1. 移动互斥锁进线回路;循环应该做的第一件事是锁定数据,它应该做的最后一件事是解锁它。
  2. 设置三个bool标志:is_new_line_availableis_new_subdata_availableis_eof
  3. 设置三个pthread_cond_t条件:new_line_is_available,new_line_is_emptynew_subdata_is_available
  4. 确保每个线程都有调用pthread_exit()来终止该线程的条件。

的生产线:

static void* produce(void* arg) { 
    for (;;) { 
     pthread_mutex_lock(&lock); 
     while (is_new_line_available) { 
      pthread_cond_wait(&new_line_is_empty, &lock); 
     } 
     // ... read a line of data into buffer ... 
     if (EOF) { 
      is_new_line_available = true; 
      is_new_subdata_available = true; 
      is_eof = false; 
      pthread_cond_signal(&new_line_is_available); 
      pthread_cond_signal(&new_subdata_is_available); 
      pthread_mutex_unlock(&lock); 
      pthread_exit(NULL); 
     } 
     is_new_line_available = true; 
     is_new_chromosome_available = false; 
     is_eof = false; 
     pthread_cond_signal(&new_line_is_available); 
     pthread_mutex_unlock(&lock); 
    } 
} 

的消耗螺纹:

static void* consume(void* arg) { 
    for (;;) { 
     pthread_mutex_lock(&lock); 
     while (is_new_line_available) { 
      pthread_cond_wait(&new_line_is_available, &lock); 
     } 
     // ... process line of data to look for subdata type ... 
     if (EOF) { 
      is_eof = true; 
      pthread_cond_signal(&new_subdata_is_available); 
      pthread_mutex_unlock(&lock); 
      pthread_exit(NULL); 
     } 
     else if (subdata_found) { 
      is_new_subdata_available = true; 
      is_new_line_available = false; 
      pthread_cond_signal(&new_line_is_empty); 
     } 
     pthread_mutex_unlock(&lock); 
    } 
} 

然后,第三个 “子数据” - 处理线程:

static void* consume_subdata_from_the_consumer(void* arg) { 
    for (;;) { 
     if (is_eof) { 
      pthread_exit(NULL); 
     } 
     pthread_mutex_lock(&lock); 
     while (!is_new_subdata_available) { 
      pthread_cond_wait(&new_subdata_is_available, &lock); 
     } 
     // ... process subdata ... 
     is_new_subdata_available = false; 
     is_new_line_available = true; 
     pthread_cond_signal(new_line_is_available); 
     pthread_mutex_unlock(&lock); 
    } 
} 

一些观察:

  • 所有的线程应该有一个条件,让他们到pthread_exit(),或父进程将挂起。
  • 需要在锁定和解锁指令之间拉取修改状态的所有代码,或者无序处理的数据可能会损坏。
  • 任何缓冲区溢出或写入已初始化的数据都可能导致问题。例如,使用calloc()可以在线程中使用它之前初始化字符缓冲区。
1

您只制作信号we_have_data。但是由于它将buffer_is_empty设置为false,它可以使consume_from_the_consumer线程就绪,但它不会解除阻塞,因为它在第二个条件变量上被阻止。

为了使您的生活更简单,我建议两个转变:

  1. 始终使用pthread_cond_broadcast
  2. 只能使用一个条件变量。

这可能效率会稍低一点,但有几个完整类别的微妙的错误,这是不可能的。

+0

我无法得到这个工作,似乎。通过三个线程和一个“开/关”条件,两个线程在给定时间将被解除阻塞。你有这样的例子吗? –

+0

@AlexReynolds两个线程将被解锁,但“错误”的线程会立即再次阻止。这就是'while'循环的意义所在。当你尝试时出了什么问题? –

+0

我的程序挂在处理数据上。 –