2016-02-05 45 views
0

我正在使用pthreads中的生产者 - 消费者示例。这个想法如下。生产者生成一个在所有消费者线程中共享的新值k。有n_consumers线程和只有一个生产者。为了便于访问k,使用了一个包含n_consumers元素的数组。这样,每次生成一个新的k时,它都被复制到整个poolpool[0]=k; pool[1]=k;...; pool[n_consumers-1]=k)。pthreads条件 - 有可能忽略cond信号?

这是我的代码片段:

void *consumer (void *args) { 

    int id = *(int *) args; 

    while (1) { 

     barrier (&barrier1, id); 

     // 1. lock 
     pthread_mutex_lock (&mu); 

     // 2. wait 
     pthread_cond_wait (&cond_producer_is_ready, &mu); 

     // 3. unlock 
     pthread_mutex_unlock (&mu); 

     // 4. do something with pool[id] value 
     printf ("thread %d using value %d\n", id, pool[id]); 

     // 5. stop? 
     if (stop_condition(pool[id])) 
      break; 
    }  
    return NULL; 
} 

void *producer (void *args) { 

    int i; 
    int id = n_consumers; 

    while (1) { 

     barrier(&barrier1, id); 

     // 1. lock 
     pthread_mutex_lock (&mu);  

     // 2. produce some new values 
     for (i=0; i<n_consumers; i++) 
      pool[i]++; 

     // 3. send message indicating a new value is available 
     printf ("producer sends broadcast...\n"); 
     pthread_cond_broadcast (&cond_producer_is_ready); 

     // 4. unlock 
     pthread_mutex_unlock (&mu); 

     // 5. stop? 
     // it could be pool[x], it does not matter the index 
     if (stop_condition(pool[0])) 
      break; 
    } 
    return NULL; 
} 

这是输出:

thread 0 in barrier (count 1) 
thread 2 in barrier (count 2) 
thread 1 in barrier (count 3) 
thread 3 in barrier (count 4) 
thread 4 in barrier (count 5) <- all the threads are in the barrier (OK) 
producer sends broadcast... <- producer send the message to access to k 
thread 4 in barrier (count 1) <- the producer waits in the barrier 
thread 1 using value 1   <- consumer 1 received the message and use k=1 
thread 1 in barrier (count 2) 
thread 3 using value 1   <- consumer 2 and 3 received the message too 
thread 3 in barrier (count 3) 
thread 2 using value 1 
thread 2 in barrier (count 4) <- only the consumer 0 did not received the message 

看来,只有少数消费者接收信号cond_producer_is_ready,在这种情况下,消费者线程1,2和3,而消费者线程0仍在等待这样的消息。屏障工作正常,因为所有的线程都可以达到它。然而,问题在于消息的接收。是否可以确定给定线程是否收到或忽略给定的已发送消息(如cond_producer_is_ready)?

更新 感谢@caf的回答。这是我原来的代码修正版本,没有以前的错误,只是为了完整性(或pastebin复制):

/* 
| barrier_and_pool.c 
| $ gcc barrier_and_pool.c -pthread -o barrier_and_pool.out 
* */ 

#include <stdio.h> 
#include <stdlib.h> 
#include <pthread.h> 
#include <unistd.h> 

// shared resource 
int *pool        = NULL;  

// producer and consumers 
int n_consumers       = 4; 
int numproc        = 5; // n_consumers + 1 producer 
int new_value_available     = 0; // mutex flag 
int last_value_produced     = 0; // mutex flag 

pthread_mutex_t mu      = PTHREAD_MUTEX_INITIALIZER; 
pthread_cond_t cond_producer_is_ready = PTHREAD_COND_INITIALIZER; 

typedef struct 
{ 
    int    cur_count; 
    pthread_mutex_t barrier_mutex; 
    pthread_cond_t barrier_cond; 
}    barrier_t; 

barrier_t pbarrier; 

void barrier_init(barrier_t * mybarrier) 
{ 
    pthread_mutex_init(&(mybarrier->barrier_mutex), NULL); 
    pthread_cond_init(&(mybarrier->barrier_cond), NULL); 
    mybarrier->cur_count = 0; 
} 


void barrier(barrier_t * mybarrier, int id) { 
    pthread_mutex_lock(&(mybarrier->barrier_mutex)); 
    mybarrier->cur_count++; 
    printf("thread %d in barrier (count %d)\n", id, mybarrier->cur_count); 
    if (mybarrier->cur_count!=numproc) { 
     pthread_cond_wait(&(mybarrier->barrier_cond), &(mybarrier->barrier_mutex)); 
    } 
    else 
    { 
     mybarrier->cur_count=0; 
     printf ("broadcast sent by thread %d\n", id); 
     pthread_cond_broadcast(&(mybarrier->barrier_cond)); 
    } 
    pthread_mutex_unlock(&(mybarrier->barrier_mutex)); 

    printf ("thread %d out of barrier\n", id); 
} 

int stop_condition (int k_value) { 
    if (k_value >= 5)  // we stop after five iters 
     return 1; 
    else 
     return 0; 
} 

void *consumer (void *args) { 

    int id = *(int *) args; 
    int last_value_consumed = 0; 

    while (1) { 

     barrier (&pbarrier, id); 

     // 1. lock 
     pthread_mutex_lock (&mu); 

     // 2. wait 
     while (last_value_produced == last_value_consumed) 
      pthread_cond_wait (&cond_producer_is_ready, &mu); 

     last_value_consumed++; 

     // 3. unlock 
     pthread_mutex_unlock (&mu); 

     // 4. do something with pool[id] value 
     printf ("thread %d using value %d\n", id, pool[id]); 

     // 5. stop? 
     if (stop_condition(pool[id])) 
      break; 
    }  
    return NULL; 
} 

void *producer (void *args) { 

    int i; 
    int id = n_consumers; 

    while (1) { 

     barrier(&pbarrier, id); 

     // 1. lock 
     pthread_mutex_lock (&mu);  

     // 2. produce some new values 
     for (i=0; i<n_consumers; i++) 
      pool[i]++; 

     // 3. send message indicating a new value is available 
     last_value_produced++; 
     printf ("producer sends broadcast (last_value_produced: %d)...\n", last_value_produced); 
     pthread_cond_broadcast (&cond_producer_is_ready); 

     // 4. unlock 
     pthread_mutex_unlock (&mu); 

     // 5. stop? 
     // it could be pool[x], it does not matter the index 
     if (stop_condition(pool[0])) 
      break; 
    } 
    return NULL; 
} 

int main (int argc, char *argv[]) { 


    pool = (int*) malloc (sizeof(int)*n_consumers); 

    int *ids = (int*) malloc(sizeof(int)*n_consumers); 

    pthread_t pro; 
    pthread_t cons[n_consumers]; 

    // init barrier 
    barrier_init(&pbarrier); 

    // init consumers 
    int i; 
    for (i=0; i<n_consumers; i++) { 
     ids[i] = i; 
     pool[i] = 0; 
     pthread_create (&cons[i], NULL, consumer, &ids[i]); 
    } 

    // init producer 
    pthread_create (&pro, NULL, producer, NULL); 

    // join 
    for (i=0; i<n_consumers; i++) { 
     pthread_join (cons[i], NULL); 
    } 

    pthread_join (pro, NULL); 

    return 0; 
} 

回答

2

与您的代码的问题是,生产者可以前的任何或全部获得互斥体消费者,在这种情况下消费者将在pthread_mutex_lock();处等待条件被发信号时 - 所以他们将永远等待pthread_cond_wait()(信号不排队:如果你不等待条件变量获得发信号,你会错过它)。

这就是为什么pthread条件变量必须与某个共享状态的条件配对 - 称为谓词。而不是只打电话pthread_cond_wait(),你把它的循环中测试谓词:

while (!new_value_available) 
    pthread_cond_wait (&cond_producer_is_ready, &mu); 

这样,如果生产商抢先或消费者也没关系:如果消费者之前达到其临界区生产者,谓词将是错误的,消费者会等待;如果生产者在消费者之前到达其关键部分,则情况将是真实的,并且消费者将继续。

在这种情况下,要创建谓词,可以将全局共享变量last_value_produced初始化为零,并在生成器刚刚广播条件变量之前将其加1。每个消费者保持一个局部变量last_value_consumed也初始化为零,且条件变为:

while (last_value_consumed == last_value_produced) 
    pthread_cond_wait(&cond_producer_is_ready, &mu); 

消费者开始递增last_value_consumed