2011-09-05 45 views
4

我的问题与How do I check if a thread is terminated when using pthread?类似。但我没有完全得到答案。多个线程之一的Pthread_join

我的问题是...我创建的线程一定数量的别回答n。只要main检测到任何一个线程的退出,它就会创建另一个线程,从而将并发度保持为​​n等等。

如何在主线程检测线程的退出。 pthread_join等待一个特定的线程退出,但在我的情况下,它可以是n个线程中的任何一个。

谢谢

回答

3

最明显的,而不AIX表明重组你的代码,是让每个线程设置的东西,以表明它已经完成(可能是在所有线程之间共享一个数组的值,每个工人线程一个插槽),然后信号一个条件变量。主线程在条件变量上等待,每次唤醒时,处理所有表示自己已完成的线程:可能有多个线程。

当然,这就是说,如果线程被取消,你永远不会得到信号中,所以使用的取消处理或不取消该线程。

+0

听起来不错。但是,如果一个线程在一个条件变量上发出信号,甚至在主线程正在等待它之前......说主线程创建10个线程并且线程1信号在线程10被创建之前。 –

+2

@Juggler:你需要正确使用条件变量,我已经遗漏了细节,但是有关于如何去做或者检查pthreads教程的问题。主线程将(1)获取互斥锁,(2)检查做什么和做什么,(3)在condvar上等待,(4)转到2.工人将(1)获取互斥锁,(2)记录下需要做的事情,(3)发出condvar信号,(4)释放互斥信号。 –

+0

啊......你可能在暗示一个线程池的一种实现。 –

2

有几种方法可以解决这个问题。

一种自然的方法是拥有一个固定大小的线程池n,并有一个队列,主线程将放置任务并从中获取任务并处理它们。这将保持不变程度的并发性。

一种替代方案是具有与设置为n初始值的信号量。每次创建工作线程时,信号量的值都需要递减。每当工人即将终止时,就需要增加(“发布”)信号量。现在,等待主线程中的信号量将被阻止,直到剩下的工人数量少于n;然后会产生一个新的工作线程,等待恢复。既然你不会在工人上使用pthread_join,他们应该分开(pthread_detach)。

0

如果你想被告知线程退出(通过pthread_exit或取消),您可以使用带有pthread_cleanup_push的处理程序,告知孩子退出的主线程(通过一个条件变量,信号量或类似的),所以它可以在或者等待它,或者直接开始一个新的(假设孩子先被分离)。

或者,我建议有线程等待更多的工作(通过@aix的建议),而不是结束。

0

如果你的父线程需要做其他的其他事情,那么就不能只是不断地阻塞pthread_join,你需要一种方法来从子线程发送消息给主线程告诉它调用pthread_join 。有许多IPC机制可以用于此目的。

当一个子线程完成它的工作后,它会通过IPC发送某种消息给主线程,说“我完成了我的工作”并且还传递了它自己的线程ID,然后主线程知道调用pthread_join在该线程ID上。

0

一个简单的方法是使用管道作为(工作)线程和主线程之间的通信通道。当一个线程终止时,它将结果(下面例子中的线程ID)写入管道。主线程在管道上等待,并在其可用时立即读取线程结果。

与互斥锁或信号量不同,管道文件描述符可以通过应用程序主事件循环(如libevent)轻松处理。只要他们写入PIPE_BUF或更少的字节(我的Linux上为4096),从不同线程写入同一管道的操作就是原子操作。

下面是一个创建10个线程的演示,每个线程具有不同的使用期限。然后主线程等待任何线程终止并打印其线程ID。当所有十个线程都完成时它终止。

$ cat test.cc 
#include <iostream> 
#include <pthread.h> 
#include <unistd.h> 
#include <stdlib.h> 
#include <time.h> 

void* thread_fun(void* arg) { 
    // do something 
    unsigned delay = rand() % 10; 
    usleep(delay * 1000000); 

    // notify termination 
    int* thread_completed_fd = static_cast<int*>(arg); 
    pthread_t thread_id = pthread_self(); 
    if(sizeof thread_id != write(*thread_completed_fd, &thread_id, sizeof thread_id)) 
     abort(); 

    return 0; 
} 

int main() { 
    int fd[2]; 
    if(pipe(fd)) 
     abort(); 

    enum { THREADS = 10 }; 

    time_t start = time(NULL); 

    // start threads 
    for(int n = THREADS; n--;) { 
     pthread_t thread_id; 
     if(pthread_create(&thread_id, NULL, thread_fun, fd + 1)) 
      abort(); 
     std::cout << time(NULL) - start << " sec: started thread " << thread_id << '\n'; 
    } 

    // wait for the threads to finish 
    for(int n = THREADS; n--;) { 
     pthread_t thread_id; 
     if(sizeof thread_id != read(fd[0], &thread_id, sizeof thread_id)) 
      abort(); 
     if(pthread_join(thread_id, NULL)) // detached threads don't need this call 
      abort(); 
     std::cout << time(NULL) - start << " sec: thread " << thread_id << " has completed\n"; 
    } 

    close(fd[0]); 
    close(fd[1]); 
} 

$ g++ -o test -pthread -Wall -Wextra -march=native test.cc 
$ ./test 
0 sec: started thread 140672287479552 
0 sec: started thread 140672278759168 
0 sec: started thread 140672270038784 
0 sec: started thread 140672261318400 
0 sec: started thread 140672252598016 
0 sec: started thread 140672243877632 
0 sec: started thread 140672235157248 
0 sec: started thread 140672226436864 
0 sec: started thread 140672217716480 
0 sec: started thread 140672208996096 
1 sec: thread 140672208996096 has completed 
2 sec: thread 140672226436864 has completed 
3 sec: thread 140672287479552 has completed 
3 sec: thread 140672243877632 has completed 
5 sec: thread 140672252598016 has completed 
5 sec: thread 140672261318400 has completed 
6 sec: thread 140672278759168 has completed 
6 sec: thread 140672235157248 has completed 
7 sec: thread 140672270038784 has completed 
9 sec: thread 140672217716480 has completed 
相关问题