2011-12-13 105 views
1

我想实现一个基本的工作池使用pthreads。 这种情况是我想要一个固定数量的工人,这些工人在我的程序的整个持续时间内生活。广播不能与障碍

我永远不需要单线信号,但所有的线程一次,这就是为什么我想做一个单一的广播。

我需要等待所有线程在主程序继续之前完成,所以我决定在每个工作线程中使用barrier_wait。

问题是,如果我的线程调用barrier_wait,则广播不起作用。

完整的示例和可编译的代码如下所示。这仅仅是广播的单触发,在我的完整版,我会ofcause遍历像

while(conditionMet){ 
    1.prepare data 
    2.signal threads using data 
    3.post processing of thread results (because of barrier all threads finished) 
    4.modify conditionMet if needed 
} 

感谢

#include <pthread.h> 
#include <stdio.h> 
#include <unistd.h> 
void checkResults(char *str,int i){ 
    fprintf(stdout,"%s:%d\n",str,i); 
} 
void checkResults(char *str,size_t n,int i){ 
    fprintf(stdout,"%s[%lu]:%d\n",str,n,i); 
} 

/* For safe condition variable usage, must use a boolean predicate and */ 
/* a mutex with the condition.           */ 
int     conditionMet = 0; 
pthread_cond_t  cond = PTHREAD_COND_INITIALIZER; 
pthread_mutex_t  mutex = PTHREAD_MUTEX_INITIALIZER; 
pthread_barrier_t barr; 

#define NTHREADS 3 

void *threadfunc(void *parm) 
{ 
    size_t i = (size_t) parm; 
    int   rc; 

    rc = pthread_mutex_lock(&mutex); 
    checkResults("\tpthread_mutex_lock()",i, rc); 

    while (0==conditionMet) { 
    printf("\tThread blocked[%d]\n",(int)i); 
    rc = pthread_cond_wait(&cond, &mutex); 
    checkResults("\tpthread_cond_wait()",i, rc); 
    checkResults("\tbefore barrier",i); 
    rc = pthread_barrier_wait(&barr);//broadcast works if this is commented out 
    if(rc) 
     fprintf(stdout,"problems waiting for baarr\n"); 
    checkResults("\tafter barrier",i); 
    } 

    rc = pthread_mutex_unlock(&mutex); 
    checkResults("\tpthread_mutex_lock()",i, rc); 
    return NULL; 
} 

int main(int argc, char **argv) 
{ 
    int     rc=0; 
    int     i; 
    pthread_t    threadid[NTHREADS]; 

    if(pthread_barrier_init(&barr, NULL,NTHREADS)) 
    { 
     printf("Could not create a barrier\n"); 
    } 



    printf("Enter Testcase - %s\n", argv[0]); 

    printf("Create %d threads\n", NTHREADS); 
    for(i=0; i<NTHREADS; ++i) { 
    rc = pthread_create(&threadid[i], NULL, threadfunc,(void *) i); 
    if(rc) 
     checkResults("pthread_create()", rc); 
    } 

    sleep(5); /* Sleep isn't a very robust way to serialize threads */ 
    rc = pthread_mutex_lock(&mutex); 
    checkResults("pthread_mutex_lock()", rc); 

    /* The condition has occured. Set the flag and wake up any waiters */ 
    conditionMet = 1; 
    printf("\nWake up all waiters...\n"); 
    rc = pthread_cond_broadcast(&cond); 
    checkResults("pthread_cond_broadcast()", rc); 

    rc = pthread_mutex_unlock(&mutex); 
    checkResults("pthread_mutex_unlock()", rc); 

    printf("Wait for threads and cleanup\n"); 
    for (i=0; i<NTHREADS; ++i) { 
    rc = pthread_join(threadid[i], NULL); 
    checkResults("pthread_join()", rc); 
    } 
    pthread_cond_destroy(&cond); 
    pthread_mutex_destroy(&mutex); 

    printf("Main completed\n"); 
    return 0; 
} 
+1

你真的想让你的工作池中的所有线程都为每个事件而醒来吗?通常你只想要一个线程唤醒。如果是这种情况,你可以看看'pthread_cond_signal'。 –

回答

3

正确接收到信号后,线程函数将锁定mutex。因此,只有一个线程功能将在屏障上等待(mutex仍处于锁定状态),屏障标准将永远不会被满足。

您应该重新设计应用程序的逻辑以使用屏障。在等待屏障之前,mutex必须解锁。另外,如果在代码中使用pthread_cond_wait(),则只有一个线程在您的应用程序中处于活动状态,因此完全不需要多线程。

编辑:

我想阐述的最后一句话了一下。让我们假设我们修改线程函数是这样的:

while (0==conditionMet) {  
    printf("\tThread blocked[%d]\n",(int)i);  
    rc = pthread_cond_wait(&cond, &mutex);  
    checkResults("\tpthread_cond_wait()",i, rc);  
    checkResults("\tbefore barrier",i); 

    pthread_mutex_unlock(&mutex); //added  

    rc = pthread_barrier_wait(&barr);//broadcast works if this is commented out  
    if(rc) 
     fprintf(stdout,"problems waiting for baarr\n");  
    checkResults("\tafter barrier",i); 
} 

这样我们就可以消除僵局的时候只有一个线程能够达到的mutex锁定障碍的原因。但是在给定时间内只有一个线程将在关键部分中运行:当它是pthread_cond_wait()返回mutex被锁定并且它将保持锁定状态,直到线程函数达到_unlock(); _等待();对。只有在这之后,下一个单线程才能运行并达到其屏障。洗涤,冲洗,重复...

OP prolly想要的是让线程函数同时运行(为什么其他人会想要有一个线程池?)。在这种情况下,功能可能看起来像这样:

void *threadfunc(void *parm) 
{ 
/*...*/ 
struct ThreadRuntimeData { 
} rtd; 
while (0==conditionMet) {  
    printf("\tThread blocked[%d]\n",(int)i);  
    rc = pthread_cond_wait(&cond, &mutex);  
    checkResults("\tpthread_cond_wait()",i, rc); 

    GetWorkData(&rtd); //Gets some data from critical section and places it in rtd 
    pthread_mutex_unlock(&mutex); 

    ProcessingOfData(&rtd); //here we do the thread's job 
    //without the modification of global data; this may take a while 

    pthread_mutex_lock(&mutex); 
    PublishProcessedData(&rtd); //Here we modify some global data 
    //with the results of thread's work. 
    //Other threads may do the same, so we had to enter critical section again 
    pthread_mutex_unlock(&mutex); 
    checkResults("\tbefore barrier",i); 
    rc = pthread_barrier_wait(&barr);//broadcast works if this is commented out  
    if(rc) 
     fprintf(stdout,"problems waiting for baarr\n");  
    checkResults("\tafter barrier",i); 
} 
/*...*/ 
} 

这只是一个课程草图。线程函数的优化设计取决于OP希望线程执行的操作。

作为附注,检查pthread_barrier_wait()返回结果的代码必须考虑PTHREAD_BARRIER_SERIAL_THREAD返回。另外宣布conditionMetvolatile会更安全。

+0

+1,但我不确定你最后一句话要去哪。为什么只有一个线程处于活动状态?随着更正,它按预期工作。 – Duck

+0

@Duck:编辑我的帖子来解释它 –

1

从问题中不清楚,输入数据是什么,以及它们与线程和结果之间的关系。我无法从发布的代码中看出来,因为我看不到实际工作应该在哪里完成。

假设你有ñ(子)任务,ñ线程和希望主线程等待ñ结果:你并不真的需要一个障碍,你可以做:

  • 主线程
    1. 推输入队列ň任务
    2. 等待,直到你收到ñ个结果输出队列
  • 工作者线程
    1. 从输入队列弹出一个任务
    2. 计算结果
    3. 推结果到输出队列

最简单的可能的同步队列会一次推送/弹出一个项目(如果队列为空,则按下信号,如果队列为w,则弹出等待为空,就这样)。

您可以很容易地添加其中广播的push_n(vector<task> const &input)pop_n(int count, vector<result> &output)等待所有count结果,作为优化,但基本模式是相同的。

0

你让自己的事情变得比你要难得多。摆脱障碍。

如果你想等到所有的工作都完成了,只需要保留一定数量的工作。用互斥锁保护它。用条件变量触发它。然后只需使用pthread_cond_wait等待它为零。 (您可以使用您已用于处理作业队列的相同逻辑。)

或者,在线程没有更多工作要做时终止线程编码。然后等到所有工作线程都以pthread_join终止。