2010-03-03 281 views
2

(简而言之:main()的WaitForSingleObject挂起在下面的程序中)。ReleaseSemaphore不释放信号量

我正在尝试写一段代码调度线程并等待它们在恢复之前完成。我不是每次都创建线程,而是花费很大,所以我让他们睡着了。主线程以CREATE_SUSPENDED状态创建X个线程。

同步使用X作为MaximumCount的信号量完成。信号量的计数器被置零,线程被调度。在他们进入睡眠状态之前,Thred会执行一些愚蠢的循环并调用ReleaseSemaphore。然后,主线程使用WaitForSingleObject X来确保每个线程完成其工作并正在休眠。然后它循环并再次完成。

程序不时退出。当我扼杀程序,我可以看到WaitForSingleObject挂起。这意味着线程的ReleaseSemaphore不起作用。没有什么是printf'ed所以没有出错。

也许两个线程不应调用ReleaseSemaphore在精确的同一时间,但是这将抵消信号量的目的...

我只是不神交它...

其他解决方案同步线程被感激地接受!

#define TRY 100 
#define LOOP 100 

HANDLE *ids; 
HANDLE semaphore; 

DWORD WINAPI Count(__in LPVOID lpParameter) 
{ 
float x = 1.0f; 
while(1) 
{ 
    for (int i=1 ; i<LOOP ; i++) 
    x = sqrt((float)i*x); 
    while (ReleaseSemaphore(semaphore,1,NULL) == FALSE) 
    printf(" ReleaseSemaphore error : %d ", GetLastError()); 
    SuspendThread(ids[(int) lpParameter]); 
} 
return (DWORD)(int)x; 
} 

int main() 
{ 
SYSTEM_INFO sysinfo; 
GetSystemInfo(&sysinfo); 
int numCPU = sysinfo.dwNumberOfProcessors; 

semaphore = CreateSemaphore(NULL, numCPU, numCPU, NULL); 
ids = new HANDLE[numCPU]; 

for (int j=0 ; j<numCPU ; j++) 
    ids[j] = CreateThread(NULL, 0, Count, (LPVOID)j, CREATE_SUSPENDED, NULL); 

for (int j=0 ; j<TRY ; j++) 
{ 
    for (int i=0 ; i<numCPU ; i++) 
    { 
    if (WaitForSingleObject(semaphore,1) == WAIT_TIMEOUT) 
    printf("Timed out !!!\n"); 
    ResumeThread(ids[i]); 
    } 
    for (int i=0 ; i<numCPU ; i++) 
    WaitForSingleObject(semaphore,INFINITE); 
    ReleaseSemaphore(semaphore,numCPU,NULL); 
} 
CloseHandle(semaphore); 
printf("Done\n"); 
getc(stdin); 
} 

回答

3

在下列情况下,问题发生:

主线程恢复工作线程:

for (int i=0 ; i<numCPU ; i++) 
    { 
    if (WaitForSingleObject(semaphore,1) == WAIT_TIMEOUT) 
    printf("Timed out !!!\n"); 
    ResumeThread(ids[i]); 
    } 

工作者线程做他们的工作,并释放信号:

for (int i=1 ; i<LOOP ; i++) 
    x = sqrt((float)i*x); 
    while (ReleaseSemaphore(semaphore,1,NULL) == FALSE) 

所有工作线程的主线程等待和重置信号:

for (int i=0 ; i<numCPU ; i++) 
    WaitForSingleObject(semaphore,INFINITE); 
    ReleaseSemaphore(semaphore,numCPU,NULL); 

主线程进入下一轮,试图恢复工作线程(请注意,工作线程没有事件暂停自己呢!这是问题开始的地方......你试图恢复不一定被暂停尚)主题:

for (int i=0 ; i<numCPU ; i++) 
    { 
    if (WaitForSingleObject(semaphore,1) == WAIT_TIMEOUT) 
    printf("Timed out !!!\n"); 
    ResumeThread(ids[i]); 
    } 

终于工作者线程挂起自己(虽然他们应该已经开始下一轮):

SuspendThread(ids[(int) lpParameter]); 

因为所有的工人正在悬挂在主线程永远等待:

for (int i=0 ; i<numCPU ; i++) 
    WaitForSingleObject(semaphore,INFINITE); 

这里有一个链接,显示了如何正确地解决生产者/消费者问题:

http://en.wikipedia.org/wiki/Producer-consumer_problem

也是我认为critical sections比信号量和互斥体快得多。他们在大多数情况下也更容易理解(imo)。

+0

不错的理论/解释。在恢复线程之前将主要进入睡眠状态(1)似乎解决了这个问题,但随后性能消失了。至少这证实了这个理论:主要的恢复线程还没有睡觉\ o / – Gabriel 2010-03-03 22:57:04

3

我不理解的代码,但是线程同步肯定是不好的。您假定线程将按特定顺序调用SuspendThread()。一个成功的WaitForSingleObject()调用不会告诉你线程调用ReleaseSemaphore()。因此,您将在未挂起的线程上调用ReleaseThread()。这很快就使程序陷入僵局。

另一个不好的假设是一个线程在WFSO返回后已经调用了SuspendThread。通常是的,并不总是。该线程可以在RS呼叫之后立即被抢占。您将再次在未挂起的线程上调用ReleaseThread()。这通常需要一天左右的时间来锁定你的程序。

我觉得有一个ReleaseSemaphore调用太多了。毫无疑问,试图解开它。

你不能用Suspend/ReleaseThread()来控制线程,不要尝试。

+0

好吧,我不承担任何SuspendThread顺序。最内层的循环实际上是以任意顺序恢复线程,但随后我调用WaitForSingleObject的numCPU次数,这将等待numCPU ReleaseSemaphore(),这可能以任何顺序发生。 – Gabriel 2010-03-03 22:26:15

+0

@ Gabriel:我不认为你会解决这个问题,直到你看到在未挂起的线程上调用ReleaseThread是你的主要问题。 – 2010-03-04 00:30:55

+0

我没能理解你的第二段。现在我明白了,这与stmax'一样。感谢您的解释! – Gabriel 2010-03-04 13:04:27

4

而不是使用信号量(至少直接)或主要显式唤醒线程来完成一些工作,我总是使用线程安全队列。当main想要一个工作线程执行某些操作时,它会将该工作的描述推送到队列中。工作线程每个只做一个工作,然后尝试从队列中弹出另一个工作,并最终暂停,直到队列中有工作要做:

队列的代码如下所示:

#ifndef QUEUE_H_INCLUDED 
#define QUEUE_H_INCLUDED 

#include <windows.h> 

template<class T, unsigned max = 256> 
class queue { 
    HANDLE space_avail; // at least one slot empty 
    HANDLE data_avail; // at least one slot full 
    CRITICAL_SECTION mutex; // protect buffer, in_pos, out_pos 

    T buffer[max]; 
    long in_pos, out_pos; 
public: 
    queue() : in_pos(0), out_pos(0) { 
     space_avail = CreateSemaphore(NULL, max, max, NULL); 
     data_avail = CreateSemaphore(NULL, 0, max, NULL); 
     InitializeCriticalSection(&mutex); 
    } 

    void push(T data) { 
     WaitForSingleObject(space_avail, INFINITE);  
     EnterCriticalSection(&mutex); 
     buffer[in_pos] = data; 
     in_pos = (in_pos + 1) % max; 
     LeaveCriticalSection(&mutex); 
     ReleaseSemaphore(data_avail, 1, NULL); 
    } 

    T pop() { 
     WaitForSingleObject(data_avail,INFINITE); 
     EnterCriticalSection(&mutex); 
     T retval = buffer[out_pos]; 
     out_pos = (out_pos + 1) % max; 
     LeaveCriticalSection(&mutex); 
     ReleaseSemaphore(space_avail, 1, NULL); 
     return retval; 
    } 

    ~queue() { 
     DeleteCriticalSection(&mutex); 
     CloseHandle(data_avail); 
     CloseHandle(space_avail); 
    } 
}; 

#endif 

在线程中使用它的大致等价代码看起来像这样。我没有完全理清你的线程函数在做什么,但是它是用平方根求和的东西,显然你对线程同步更感兴趣,而不是线程实际做的那一刻。

编辑:(基于评论): 如果您需要main()等待一些任务来完成,做更多的工作,然后分配更多的任务,它通常是最好的处理,通过把一个事件(例如)成每个任务,并让你的线程函数设置事件。修改后的代码要做到这一点应该是这样的(注意,队列代码不受影响):

#include "queue.hpp" 

#include <iostream> 
#include <process.h> 
#include <math.h> 
#include <vector> 

struct task { 
    int val; 
    HANDLE e; 

    task() : e(CreateEvent(NULL, 0, 0, NULL)) { } 
    task(int i) : val(i), e(CreateEvent(NULL, 0, 0, NULL)) {} 
}; 

void process(void *p) { 
    queue<task> &q = *static_cast<queue<task> *>(p); 

    task t; 
    while (-1 != (t=q.pop()).val) { 
     std::cout << t.val << "\n"; 
     SetEvent(t.e); 
    } 
} 

int main() { 
    queue<task> jobs; 

    enum { thread_count = 4 }; 
    enum { task_count = 10 }; 

    std::vector<HANDLE> threads; 
    std::vector<HANDLE> events; 

    std::cout << "Creating thread pool" << std::endl; 
    for (int t=0; t<thread_count; ++t) 
     threads.push_back((HANDLE)_beginthread(process, 0, &jobs)); 
    std::cout << "Thread pool Waiting" << std::endl; 

    std::cout << "First round of tasks" << std::endl; 

    for (int i=0; i<task_count; ++i) { 
     task t(i+1); 
     events.push_back(t.e); 
     jobs.push(t); 
    } 

    WaitForMultipleObjects(events.size(), &events[0], TRUE, INFINITE); 

    events.clear(); 

    std::cout << "Second round of tasks" << std::endl; 

    for (int i=0; i<task_count; ++i) { 
     task t(i+20); 
     events.push_back(t.e); 
     jobs.push(t); 
    } 

    WaitForMultipleObjects(events.size(), &events[0], true, INFINITE); 
    events.clear(); 

    for (int j=0; j<thread_count; ++j) 
     jobs.push(-1); 

    WaitForMultipleObjects(threads.size(), &threads[0], TRUE, INFINITE); 

    return 0; 
} 
+0

据我了解你的代码,同步发生在最后一个WaitForMultipleObjects,当你等待所有的线程死亡。我不能让他们死。代码处于无限循环中,需要线程休眠一段时间,然后进行更多工作。 我需要确保所有的工作都由主线程前的线程完成,并在返回到那些线程之前执行其他的工作,并为他们提供更多的工作。我不能重新创建新线程,这太慢了,即使使用_beginthreads()。 – Gabriel 2010-03-03 22:42:48

+0

不错的解决方案。然而,一个观察结果是线程和事件中的所有句柄都会泄漏。他们需要用CloseHandle清理清理。 – 2015-10-06 13:19:08

0

问题是,您比信号发送更频繁地等待。

for (int j=0 ; j<TRY ; j++)的循环等待八倍信号量,而四个线程将只有信号每一次和环路本身信号一次。第一次通过循环,这不是问题,因为信号量的初始值为4。第二次和以后的每一次,你都在等待太多的信号。这可以通过在前四个等待时间限制时间并且不重试错误的事实来缓解。所以有时候它可能会奏效,有时候你会等待。

我认为以下(未经测试)的更改将有所帮助。

信号量初始化为零计数:

semaphore = CreateSemaphore(NULL, 0, numCPU, NULL); 

摆脱线程恢复环路的等待(即删除以下):

if (WaitForSingleObject(semaphore,1) == WAIT_TIMEOUT) 
     printf("Timed out !!!\n"); 

从试循环结束卸下外来信号(即除去以下内容):

ReleaseSemaphore(semaphore,numCPU,NULL); 
0

这里是一个实用的解决方案。

我希望我的主程序使用线程(然后使用多个核心)来完成作业并等待所有线程完成,然后再继续执行其他任务。我不想让线程死亡并创建新线程,因为这很慢。在我的问题中,我试图通过暂停线程来做到这一点,这看起来很自然。但正如nobugz指出的,“你可以用Suspend/ReleaseThread()来控制线程。”

解决方案涉及像我用来控制线程的信号量。实际上一个信号量被用来控制主线程。现在我每个线程都有一个信号来控制线程和一个信号来控制主线程。

这里是解决方案:

#include <windows.h> 
#include <stdio.h> 
#include <math.h> 
#include <process.h> 

#define TRY 500000 
#define LOOP 100 

HANDLE *ids; 
HANDLE *semaphores; 
HANDLE allThreadsSemaphore; 

DWORD WINAPI Count(__in LPVOID lpParameter) 
{ 
    float x = 1.0f;   
    while(1) 
    { 
     WaitForSingleObject(semaphores[(int)lpParameter],INFINITE); 
     for (int i=1 ; i<LOOP ; i++) 
      x = sqrt((float)i*x+rand()); 
     ReleaseSemaphore(allThreadsSemaphore,1,NULL); 
    } 
    return (DWORD)(int)x; 
} 

int main() 
{ 
    SYSTEM_INFO sysinfo; 
    GetSystemInfo(&sysinfo); 
    int numCPU = sysinfo.dwNumberOfProcessors; 

    ids = new HANDLE[numCPU]; 
    semaphores = new HANDLE[numCPU]; 

    for (int j=0 ; j<numCPU ; j++) 
    { 
     ids[j] = CreateThread(NULL, 0, Count, (LPVOID)j, NULL, NULL); 
     // Threads blocked until main releases them one by one 
     semaphores[j] = CreateSemaphore(NULL, 0, 1, NULL); 
    } 
    // Blocks main until threads finish 
    allThreadsSemaphore = CreateSemaphore(NULL, 0, numCPU, NULL); 

    for (int j=0 ; j<TRY ; j++) 
    { 
     for (int i=0 ; i<numCPU ; i++) // Let numCPU threads do their jobs 
      ReleaseSemaphore(semaphores[i],1,NULL); 
     for (int i=0 ; i<numCPU ; i++) // wait for numCPU threads to finish 
      WaitForSingleObject(allThreadsSemaphore,INFINITE); 
    } 
    for (int j=0 ; j<numCPU ; j++) 
     CloseHandle(semaphores[j]); 
    CloseHandle(allThreadsSemaphore); 
    printf("Done\n"); 
    getc(stdin); 
}