2016-09-23 194 views
-1

我在写一个基本的生产者/消费者线程代码。我得到了大部分的工作,但我有一个问题与兰特()。或者,也许我的问题比rand()更深,而rand只是冰山一角。我不想做任何太复杂的事情(无法运行或等待)。多线程生产者/消费者

我有一个全局的整数作为缓冲区。我允许用户输入大小和运行时间的限制。 我让计数器成为一个静态全局变量。 这是我的制片人:

DWORD WINAPI producer(LPVOID n) 
{ 
    cout << "\nPRODUCER:The producer is running right now" << endl; 
    int size = (int)n; 
    int num = rand()%10;// this is for making item. 
    while (buffer.size() > size) 
    { 
    } 

    buffer.push_back(num); 
    counter++; 
    return (DWORD)n; 

} 

这是我consumer--

DWORD WINAPI consumer(LPVOID n) 
{ 

    cout << "\nCONSUMER:The consumer is running right now" << endl; 
    while (buffer.empty()) 
    { } 
    int item= buffer.front(); 
    cout << "\nCONSUMER:The consumer ate" << item << endl; 
    counter++; 

    return (DWORD)n; 


    } 
在main-

while (counter < l) 
    { 
    hThreads[0] = CreateThread(NULL, 0, producer, (LPVOID)s, NULL, &id[0]); 
    hThreads[1] = CreateThread(NULL, 0, consumer, (LPVOID)l, NULL, &id[1]); 

    waiter = WaitForMultipleObjects(MAX_THREADS, hThreads, TRUE, INFINITE); 
    } 
    for (int i = 0; i < MAX_THREADS; i++) { 
    CloseHandle(hThreads[i]); 
    } 

我的输出是这样的: enter image description here

所以每次只产生1个。 Srand也没有工作。但它运行正确的次数。

编辑--- 所以我固定的生产和消费,以应对竞争条件:

DWORD WINAPI producer(LPVOID s) 
    { 
    WaitForSingleObject(Empty, INFINITE);  
    WaitForSingleObject(Mutex, INFINITE); 
    cout << "\nPRODUCER...." << endl; 
    int size = (int)s; 
    srand(size); 
    int in = rand() % 10; 
    cout << "THIS IS IN:::" << in << endl; 
    while (buffer.size() == size) 
    { 
     ReleaseMutex(Mutex); 
     } 
    buffer.push_back(in); 
    counter++; 
    cout << "\nThe producer produces " << buffer.front() << endl; 
    ReleaseMutex(Mutex); 
    ReleaseSemaphore(Full, 1, NULL); 

    return (DWORD)s; 
    } 



    DWORD WINAPI consumer(LPVOID l) 
    { 
     WaitForSingleObject(Full, INFINITE); 
     WaitForSingleObject(Mutex, INFINITE); 
     cout << "\nCONSUMER...." << endl; 
     while (buffer.empty()) 
     { 
      ReleaseMutex(Mutex); 

     } 
    int out = buffer.front(); 
    counter++; 
    ReleaseMutex(Mutex); 
    ReleaseSemaphore(Empty, 1, NULL); 
    return (DWORD)l; 
    } 

但随意的事情仍保持演戏了。它只会一直重复生成一个数字(即使播种时也是如此)。

+0

那么同步在哪里呢? – Mysticial

+0

如果消费者没有任何东西先运行,那么生产者就会进入,然后消费者就会从中断。这取决于首先启动哪个 –

+0

缓冲区上存在未受保护的竞争条件。这是未定义的行为。 – Mysticial

回答

0

是的,创建(并销毁)一个线程来创建或处理一个数字是没有意义的 - 额外的开销不值得。加上你的代码(就像)有一些非常明显的错误或误解。它们是:

  • 在主线程中创建工作线程(在while(){}循环中),但只在最后一次销毁它们,那就是只销毁最后一个循环中创建的句柄。
  • 正如我的消息所述,srand()被调用来生成每个数字,并始终使用相同的初始种子,因此获得相同的数字是正常的。
  • while()循环检查缓冲区是空还是满是没有意义的,并且不应释放互斥锁。
  • counter变量的操作可能是错误的。生产者和消费者线程都增加它,主线程使用它来确定生成/打印的数量。
  • 在您的初始代码片段中,counter是一个全局变量,其上运行着多个线程,因此您应该以线程安全的方式读取或修改它,而不是以这种方式。您应该使用一些锁定机制,如关键部分或互锁变量访问。

我的建议将是创建一个生产者线程(产生的所有数字)和一个消费者线程(打印所有数),通过缓冲器传送。为此,您将需要以下项目(您已经实现了其中的大部分):

  • 一个“完整”信号量,计算缓冲区中的数字,最初为0。
  • 一个互补的“Empty”信号量,计算缓冲区中的空项目,最初设置为缓冲区大小。这些信号量的“总和”当然总是等于缓冲区大小。
  • 用于访问缓冲区的互斥锁(或临界区)。
  • 一个全局变量,用于告诉线程是否退出。

我在这里发布以下代码示例。不知道是否他们的工作,你可能需要修改或进行调试,但是这仅仅是展示的概念:

// Global 
#define MAX_BUF 5 
BOOL bExit = FALSE; 

// Main Thread 
    Empty = CreateSemaphore(NULL, MAX_BUF, MAX_BUF, NULL); 
    Full = CreateSemaphore(NULL, 0, MAX_BUF, NULL); 
    . 
    . 
    hThreads[0] = CreateThread(NULL, 0, producer, (LPVOID)l, NULL, &id[0]); 
    hThreads[1] = CreateThread(NULL, 0, consumer, (LPVOID)l, NULL, &id[1]); 
    waiter = WaitForMultipleObjects(MAX_THREADS, hThreads, TRUE, INFINITE); 

    for (int i = 0; i < MAX_THREADS; i++) 
     CloseHandle(hThreads[i]); 


DWORD WINAPI producer(LPVOID nCount) 
{ 
    int nItems = (int)nCount; 
    // Initialize rand() seed - each run will be generating a different sequence 
    srand(GetTickCount()); // May need to AND GetTickCount() with RAND_MAX ??? 
    // Generate nCount numbers 
    for (int i = 0; i < nItems; i++) 
    { 
     if (bExit) return 9; // Aborted 
     WaitForSingleObject(Empty, INFINITE); // Wait until at least one item empty 
     // Lock the buffer and add an item 
     WaitForSingleObject(Mutex, INFINITE); // Could be EnterCriticalSection() instead 
     if (buffer.size() >= MAX_BUF) 
     { 
      cout << "\nInternal Error: Buffer-full Check Failed!" << endl; 
      bExit = TRUE; // Tell all threads to exit 
      ReleaseMutex(Mutex); 
      return 1; // Exit with Error 
     } 
     int in = rand() % 10; 
     buffer.push_back(in); 
     cout << "The Producer generated: " << in << endl; 
     ReleaseMutex(Mutex); // Could be LeaveCriticalSection() instead 
     ReleaseSemaphore(Full, 1, NULL); // 1 item added, update semaphore 
    } 
    cout << "\nThe PRODUCER produced " << nItems << " items." << endl; 
    return 0; // OK 
} 

DWORD WINAPI consumer(LPVOID nCount) 
{ 
    int nItems = (int)nCount; 
    // Generate nCount numbers 
    for (int i = 0; i < nItems; i++) 
    { 
     if (bExit) return 9; // Aborted 
     WaitForSingleObject(Full, INFINITE); // Wait until at least one item in buffer 
     // Lock the buffer and get an item 
     WaitForSingleObject(Mutex, INFINITE); // Could be EnterCriticalSection() instead 
     if (buffer.empty()) 
     { 
      cout << "\nInternal Error: Buffer-empty Check Failed!" << endl; 
      bExit = TRUE; // Tell all threads to exit 
      ReleaseMutex(Mutex); 
      return 2; // Exit with Error 
     } 
     int out = buffer.front(); 
     buffer.erase(buffer.begin()); // Remove item from the list 
     cout << "The Consumer ate: " << out << endl; 
     ReleaseMutex(Mutex); // Could be LeaveCriticalSection() instead 
     ReleaseSemaphore(Empty, 1, NULL); // 1 item removed, update semaphore 
    } 
    cout << "\nThe CONSUMER consumed " << nItems << " items." << endl; 
    return 0; // OK 
} 

注:

  • bExit变量是全球性的访问受到越来越/修改而不是一个线程,但因为这总是在关键部分(互斥锁)内完成的,所以不需要使用另一个线程或互锁变量访问。
  • 诊断消息(例如cout << "The Consumer ate: " << out << endl;)或任何其他类型的“处理”这些数据(或在其上“工作”)可能已经在释放对象后被放置。这会更高效,将对象提前释放给其他线程。我已经这样做了,以更好地说明测试中的事件序列。
  • 如果您将MAX_BUF设置为1,则应该一次生成/打印一个数字,否则无法确定,但生成的项目减去消耗的项目当然不会超过缓冲区大小。