2011-04-27 53 views
4

我实现了我的Ubuntu服务器的简单的线程池机制(我的多客户端匿名聊天程序),我需要让我的工作线程睡眠,直到一个作业(在一个函数指针的形式和参数)需要执行。线程等待家长

我现在的系统正在走出窗口。我(工作者线程)向经理询问工作是否可用,以及是否有5ms没有睡眠。如果有,请将作业添加到工作队列并运行该功能。可怜的周期浪费。

我想要做的是做一个简单的事件样的系统。我正在考虑有一个互斥量向量(每个worker都有一个向量),并且在创建时将互斥量的句柄作为参数传入。然后在我的经理类(它保存并分发作业)中,每当创建一个线程时,都会锁定该互斥锁。当需要执行任务时,解锁下一个互斥锁,等待它被锁定和解锁,然后重新锁定它。不过,我想知道是否有更好的方法来达到这个目的。


tldr;所以我的问题是这样的。让一个线程等待管理类工作的最有效,最有效和最安全的方法是什么?轮询我应该甚至考虑的技术(每次超过1000个客户端),互斥锁是否体面?还是有其他技术?

回答

6

你需要的是条件变量。
所有的工作线程调用wait()会暂停它们。

父线程然后戴上一个队列中的工作项目,并呼吁在条件变量信号。这将唤醒正在睡觉的一条线。它可以从队列中删除作业执行作业然后调用等待条件变量返回到休眠状态。

尝试:

#include <pthread.h> 
#include <memory> 
#include <list> 

// Use RAII to do the lock/unlock 
struct MutexLock 
{ 
    MutexLock(pthread_mutex_t& m) : mutex(m) { pthread_mutex_lock(&mutex); } 
    ~MutexLock()        { pthread_mutex_unlock(&mutex); } 
    private: 
     pthread_mutex_t& mutex; 
}; 

// The base class of all work we want to do. 
struct Job 
{ 
    virtual void doWork() = 0; 
}; 

// pthreads is a C library the call back must be a C function. 
extern "C" void* threadPoolThreadStart(void*); 

// The very basre minimal part of a thread pool 
// It does not create the workers. You need to create the work threads 
// then make them call workerStart(). I leave that as an exercise for you. 
class ThreadPool 
{ 

    public: 
     ThreadPool(unsigned int threadCount=1); 
     ~ThreadPool(); 

     void addWork(std::auto_ptr<Job> job); 
    private: 

     friend void* threadPoolThreadStart(void*); 
     void workerStart(); 

     std::auto_ptr<Job> getJob(); 

     bool    finished; // Threads will re-wait while this is true. 
     pthread_mutex_t  mutex;  // A lock so that we can sequence accesses. 
     pthread_cond_t  cond;  // The condition variable that is used to hold worker threads. 
     std::list<Job*>  workQueue; // A queue of jobs. 
     std::vector<pthread_t>threads; 
}; 

// Create the thread pool 
ThreadPool::ThreadPool(int unsigned threadCount) 
    : finished(false) 
    , threads(threadCount) 
{ 
    // If we fail creating either pthread object than throw a fit. 
    if (pthread_mutex_init(&mutex, NULL) != 0) 
    { throw int(1); 
    } 

    if (pthread_cond_init(&cond, NULL) != 0) 
    { 
     pthread_mutex_destroy(&mutex); 
     throw int(2); 
    } 
    for(unsigned int loop=0; loop < threadCount;++loop) 
    { 
     if (pthread_create(threads[loop], NULL, threadPoolThreadStart, this) != 0) 
     { 
      // One thread failed: clean up 
      for(unsigned int kill = loop -1; kill < loop /*unsigned will wrap*/;--kill) 
      { 
       pthread_kill(threads[kill], 9); 
      } 
      throw int(3); 
     } 
    } 
} 

// Cleanup any left overs. 
// Note. This does not deal with worker threads. 
//  You need to add a method to flush all worker threads 
//  out of this pobject before you let the destructor destroy it. 
ThreadPool::~ThreadPool() 
{ 
    finished = true; 
    for(std::vector<pthread_t>::iterator loop = threads.begin();loop != threads.end(); ++loop) 
    { 
     // Send enough signals to free all threads. 
     pthread_cond_signal(&cond); 
    } 
    for(std::vector<pthread_t>::iterator loop = threads.begin();loop != threads.end(); ++loop) 
    { 
     // Wait for all threads to exit (they will as finished is true and 
     //        we sent enough signals to make sure 
     //        they are running). 
     void* result; 
     pthread_join(*loop, &result); 
    } 
    // Destroy the pthread objects. 
    pthread_cond_destroy(&cond); 
    pthread_mutex_destroy(&mutex); 

    // Delete all re-maining jobs. 
    // Notice how we took ownership of the jobs. 
    for(std::list<Job*>::const_iterator loop = workQueue.begin(); loop != workQueue.end();++loop) 
    { 
     delete *loop; 
    } 
} 

// Add a new job to the queue 
// Signal the condition variable. This will flush a waiting worker 
// otherwise the job will wait for a worker to finish processing its current job. 
void ThreadPool::addWork(std::auto_ptr<Job> job) 
{ 
    MutexLock lock(mutex); 

    workQueue.push_back(job.release()); 
    pthread_cond_signal(&cond); 
} 

// Start a thread. 
// Make sure no exceptions escape as that is bad. 
void* threadPoolThreadStart(void* data) 
{ 
    ThreadPool* pool = reinterpret_cast<ThreadPool*>(workerStart); 
    try 
    { 
     pool->workerStart(); 
    } 
    catch(...){} 
    return NULL; 
} 

// This is the main worker loop. 
void ThreadPool::workerStart() 
{ 
    while(!finished) 
    { 
     std::auto_ptr<Job> job = getJob(); 
     if (job.get() != NULL) 
     { 
      job->doWork(); 
     } 
    } 
} 

// The workers come here to get a job. 
// If there are non in the queue they are suspended waiting on cond 
// until a new job is added above. 
std::auto_ptr<Job> ThreadPool::getJob() 
{ 
    MutexLock lock(mutex); 

    while((workQueue.empty()) && (!finished)) 
    { 
     pthread_cond_wait(&cond, &mutex); 
     // The wait releases the mutex lock and suspends the thread (until a signal). 
     // When a thread wakes up it is help until it can acquire the mutex so when we 
     // get here the mutex is again locked. 
     // 
     // Note: You must use while() here. This is because of the situation. 
     // Two workers: Worker A processing job A. 
     //     Worker B suspended on condition variable. 
     // Parent adds a new job and calls signal. 
     // This wakes up thread B. But it is possible for Worker A to finish its 
     // work and lock the mutex before the Worker B is released from the above call. 
     // 
     // If that happens then Worker A will see that the queue is not empty 
     // and grab the work item in the queue and start processing. Worker B will 
     // then lock the mutext and proceed here. If the above is not a while then 
     // it would try and remove an item from an empty queue. With a while it sees 
     // that the queue is empty and re-suspends on the condition variable above. 
    } 
    std::auto_ptr<Job> result; 
    if (!finished) 
    { result.reset(workQueue.front()); 
     workQueue.pop_front(); 
    } 

    return result; 
} 
+0

条件变量是一个互斥/信号量? – ultifinitus 2011-04-27 06:08:54

+0

@ultifinitus:No.条件变量是线程代码中最低级别的基元(以及互斥体)。您可以从互斥/条件变量中构建信号量。 – 2011-04-27 06:28:54

+0

哈哈谢谢!我感谢帮助!我喜欢'我把这当做锻炼的一部分。我会告诉你最终产品是如何工作的。 – ultifinitus 2011-04-29 13:19:13

2

与多个消费者(工作线程消耗的工作请求)经典的生产者 - 消费者同步。众所周知的技术是有一个信号量,每个工作线程down()和每次你有工作请求,做up()。比从互斥锁工作队列中选择请求。由于一个up()只会唤醒一个down(),实际上对互斥锁的争用实际上是最小的。

或者,你可以做同样的条件变量,每个线程执行的等待和唤醒一个,当你有工作。队列本身仍然与互斥体锁定(condvar无论如何都需要一个)。

最后我不能完全肯定,但其实我觉得你可以实际使用的管道作为队列在内的所有同步(工作线程无非是想“读(的sizeof(要求))”)。有点乱七八糟,但导致更少的上下文切换。

+0

信号是简单的解决方案。它通常作为互斥体实现为条件变量和整数计数。但是,除此之外,您还必须确保您控制对任何其他共享资源的线程访问权限(例如待处理工作请求的列表)。 – 2011-04-27 16:55:01

+0

@Martin:我描述了两种方式,并明确表示队列必须被锁定。由于同步原语的等价性,选择取决于在给定环境中哪个更有效。事实上,这只是实现消息队列,所以当有一个可用时,就使用它(三个等效的同步原语是信号量,条件变量和消息队列)。 – 2011-04-28 05:09:05

1

做到这一点,最简单的方法是semaphores。这是一个信号是如何工作的:

信号量基本上是一个变量,它取空/正值。进程可以通过两种方式与它进行交互:增加或减少信号量。

增加信号量将1加到这个神奇的变量,就是这样。它减少了事物变得有趣的数量:如果计数达到零并且进程试图再次降低它,因为它不能取负值,它将会阻止块,直到变量上升

如果多个进程块正在等待减小信号量值,则每增加一个计数单位,只有一个被唤醒。

这使得创建工作/任务系统变得非常容易:您的经理进程对任务进行排队并增加信号量的值以匹配其余项目,并且您的工作进程会不断减少计数并获取任务。当没有可用的任务时,它们将阻塞,并且不消耗CPU时间。当出现时,只有一个休眠过程会唤醒。 Insta-sync魔术。

不幸的是,至少在Unix世界中,信号量API并不是非常友好,因为它由于某种原因而处理sempahores数组而不是单个数组。但是,你是一个简单的包装,远离一个漂亮的界面!

干杯!

+0

这几乎完美!我肯定会做一些大量的研究,谢谢你的回应! – ultifinitus 2011-04-27 06:09:49

3

通常的做法是让队列queue出色的工作,保护队列的互斥锁mutex,以及等待条件queue_not_empty。然后,每个工作线程将执行以下操作(使用伪API):

while (true) { 
    Work * work = 0; 
    mutex.lock(); 
    while (queue.empty()) 
     if (!queue_not_empty.wait(&mutex, timeout)) 
      return; // timeout - exit the worker thread 
    work = queue.front(); 
    queue.pop_front(); 
    mutex.unlock(); 
    work->perform(); 
} 

wait(&mutex, timeout)呼叫阻塞,直到任一等待条件被发信号或呼叫超时。通过的mutexwait()内原子解锁,并在从呼叫返回之前再次锁定,以向所有参与者提供队列的一致视图。 timeout将被选择为相当大(秒),并且会导致线程退出(如果有更多工作进入,线程池将开始新线程)。

同时,线程池的工作插入功能做到这一点:

Work * work = ...; 
mutex.lock(); 
queue.push_back(work); 
if (worker.empty()) 
    start_a_new_worker(); 
queue_not_empty.wake_one(); 
mutex.unlock(); 
+0

这就是我想的,谢谢你的超时,我以为我需要实施某种模具工作..我很感激! – ultifinitus 2011-04-27 06:07:49

2

由于网络聊天程序大概是I/O密集型而非CPU绑定的,你并不真正需要的线程。您可以使用诸如Boost.AsioGLib main loop等工具在单个线程中处理所有I/O。这些是针对特定于平台的功能的可移植抽象,这些功能允许程序阻止(可能很大的)一组打开的文件或套接字等待任意的活动,然后在活动发生时立即唤醒并作出响应。

+0

我一直在单线程select()和poll()上做所有事情,我只是担心速度......有没有任何需要? – ultifinitus 2011-04-27 06:06:25

+0

@ultifinitus,除非您的程序执行了大量的CPU工作(例如每个连接的套接字上的SSL加密),否则它可能大部分时间都处于空闲状态,等待客户端的输入,并且在输入时仅使用CPU到达。在多核内分配这样的工作量没有任何好处。 – Wyzard 2011-04-27 06:13:48

+0

@Wyzard:太棒了!我应该有一个备份系统(多线程),你觉得呢?如果你说的是真的,我**完**! – ultifinitus 2011-04-27 06:25:33