我正在使用pthreads中的生产者 - 消费者示例。这个想法如下。生产者生成一个在所有消费者线程中共享的新值k
。有n_consumers
线程和只有一个生产者。为了便于访问k
,使用了一个包含n_consumers
元素的数组。这样,每次生成一个新的k
时,它都被复制到整个pool
(pool[0]=k; pool[1]=k;...; pool[n_consumers-1]=k
)。pthreads条件 - 有可能忽略cond信号?
这是我的代码片段:
void *consumer (void *args) {
int id = *(int *) args;
while (1) {
barrier (&barrier1, id);
// 1. lock
pthread_mutex_lock (&mu);
// 2. wait
pthread_cond_wait (&cond_producer_is_ready, &mu);
// 3. unlock
pthread_mutex_unlock (&mu);
// 4. do something with pool[id] value
printf ("thread %d using value %d\n", id, pool[id]);
// 5. stop?
if (stop_condition(pool[id]))
break;
}
return NULL;
}
void *producer (void *args) {
int i;
int id = n_consumers;
while (1) {
barrier(&barrier1, id);
// 1. lock
pthread_mutex_lock (&mu);
// 2. produce some new values
for (i=0; i<n_consumers; i++)
pool[i]++;
// 3. send message indicating a new value is available
printf ("producer sends broadcast...\n");
pthread_cond_broadcast (&cond_producer_is_ready);
// 4. unlock
pthread_mutex_unlock (&mu);
// 5. stop?
// it could be pool[x], it does not matter the index
if (stop_condition(pool[0]))
break;
}
return NULL;
}
这是输出:
thread 0 in barrier (count 1)
thread 2 in barrier (count 2)
thread 1 in barrier (count 3)
thread 3 in barrier (count 4)
thread 4 in barrier (count 5) <- all the threads are in the barrier (OK)
producer sends broadcast... <- producer send the message to access to k
thread 4 in barrier (count 1) <- the producer waits in the barrier
thread 1 using value 1 <- consumer 1 received the message and use k=1
thread 1 in barrier (count 2)
thread 3 using value 1 <- consumer 2 and 3 received the message too
thread 3 in barrier (count 3)
thread 2 using value 1
thread 2 in barrier (count 4) <- only the consumer 0 did not received the message
看来,只有少数消费者接收信号cond_producer_is_ready
,在这种情况下,消费者线程1,2和3,而消费者线程0仍在等待这样的消息。屏障工作正常,因为所有的线程都可以达到它。然而,问题在于消息的接收。是否可以确定给定线程是否收到或忽略给定的已发送消息(如cond_producer_is_ready
)?
更新 感谢@caf的回答。这是我原来的代码修正版本,没有以前的错误,只是为了完整性(或pastebin复制):
/*
| barrier_and_pool.c
| $ gcc barrier_and_pool.c -pthread -o barrier_and_pool.out
* */
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>
// shared resource
int *pool = NULL;
// producer and consumers
int n_consumers = 4;
int numproc = 5; // n_consumers + 1 producer
int new_value_available = 0; // mutex flag
int last_value_produced = 0; // mutex flag
pthread_mutex_t mu = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond_producer_is_ready = PTHREAD_COND_INITIALIZER;
typedef struct
{
int cur_count;
pthread_mutex_t barrier_mutex;
pthread_cond_t barrier_cond;
} barrier_t;
barrier_t pbarrier;
void barrier_init(barrier_t * mybarrier)
{
pthread_mutex_init(&(mybarrier->barrier_mutex), NULL);
pthread_cond_init(&(mybarrier->barrier_cond), NULL);
mybarrier->cur_count = 0;
}
void barrier(barrier_t * mybarrier, int id) {
pthread_mutex_lock(&(mybarrier->barrier_mutex));
mybarrier->cur_count++;
printf("thread %d in barrier (count %d)\n", id, mybarrier->cur_count);
if (mybarrier->cur_count!=numproc) {
pthread_cond_wait(&(mybarrier->barrier_cond), &(mybarrier->barrier_mutex));
}
else
{
mybarrier->cur_count=0;
printf ("broadcast sent by thread %d\n", id);
pthread_cond_broadcast(&(mybarrier->barrier_cond));
}
pthread_mutex_unlock(&(mybarrier->barrier_mutex));
printf ("thread %d out of barrier\n", id);
}
int stop_condition (int k_value) {
if (k_value >= 5) // we stop after five iters
return 1;
else
return 0;
}
void *consumer (void *args) {
int id = *(int *) args;
int last_value_consumed = 0;
while (1) {
barrier (&pbarrier, id);
// 1. lock
pthread_mutex_lock (&mu);
// 2. wait
while (last_value_produced == last_value_consumed)
pthread_cond_wait (&cond_producer_is_ready, &mu);
last_value_consumed++;
// 3. unlock
pthread_mutex_unlock (&mu);
// 4. do something with pool[id] value
printf ("thread %d using value %d\n", id, pool[id]);
// 5. stop?
if (stop_condition(pool[id]))
break;
}
return NULL;
}
void *producer (void *args) {
int i;
int id = n_consumers;
while (1) {
barrier(&pbarrier, id);
// 1. lock
pthread_mutex_lock (&mu);
// 2. produce some new values
for (i=0; i<n_consumers; i++)
pool[i]++;
// 3. send message indicating a new value is available
last_value_produced++;
printf ("producer sends broadcast (last_value_produced: %d)...\n", last_value_produced);
pthread_cond_broadcast (&cond_producer_is_ready);
// 4. unlock
pthread_mutex_unlock (&mu);
// 5. stop?
// it could be pool[x], it does not matter the index
if (stop_condition(pool[0]))
break;
}
return NULL;
}
int main (int argc, char *argv[]) {
pool = (int*) malloc (sizeof(int)*n_consumers);
int *ids = (int*) malloc(sizeof(int)*n_consumers);
pthread_t pro;
pthread_t cons[n_consumers];
// init barrier
barrier_init(&pbarrier);
// init consumers
int i;
for (i=0; i<n_consumers; i++) {
ids[i] = i;
pool[i] = 0;
pthread_create (&cons[i], NULL, consumer, &ids[i]);
}
// init producer
pthread_create (&pro, NULL, producer, NULL);
// join
for (i=0; i<n_consumers; i++) {
pthread_join (cons[i], NULL);
}
pthread_join (pro, NULL);
return 0;
}