2016-04-22 488 views
2

真正的代码方式更复杂,但我认为我设法制作了一个mcve。C++线程安全和notify_all()

我试图做到以下几点:

  1. 有一些线程做的工作
  2. 把他们都进入暂停状态
  3. 唤醒他们第一次,等待它完成,再醒来第二个,等待它完成,唤醒了第三个。等等。

我使用的代码下面,它似乎工作

std::atomic_int which_thread_to_wake_up; 
std::atomic_int threads_asleep; 
threads_asleep.store(0); 
std::atomic_bool ALL_THREADS_READY; 
ALL_THREADS_READY.store(false); 
int threads_num = .. // Number of threads 
bool thread_has_finished = false; 

std::mutex mtx; 
std::condition_variable cv; 

std::mutex mtx2; 
std::condition_variable cv2; 


auto threadFunction = [](int my_index) { 

    // some heavy workload here.. 
    .... 

    { 
     std::unique_lock<std::mutex> lck(mtx); 
     ++threads_asleep; 
     cv.notify_all(); // Wake up any other thread that might be waiting 
    } 

    std::unique_lock<std::mutex> lck(mtx); 
    bool all_ready = ALL_THREADS_READY.load(); 
    size_t index = which_thread_to_wake_up.load(); 
    cv.wait(lck, [&]() { 
     all_ready = ALL_THREADS_READY.load(); 
     index = which_thread_to_wake_up.load(); 
     return all_ready && my_index == index; 
    }); 

    // This thread was awaken for work! 
    .. do some more work that requires synchronization.. 

    std::unique_lock<std::mutex> lck2(mtx2); 
    thread_has_finished = true; 
    cv2.notify_one(); // Signal to the main thread that I'm done 
}; 

// launch all the threads.. 
std::vector<std::thread> ALL_THREADS; 
for (int i = 0; i < threads_num; ++i) 
    ALL_THREADS.emplace_back(threadFunction, i);  


// Now the main thread needs to wait for ALL the threads to finish their first phase and go to sleep  

std::unique_lock<std::mutex> lck(mtx); 
size_t how_many_threads_are_asleep = threads_asleep.load(); 
    while (how_many_threads_are_asleep < threads_num) { 
     cv.wait(lck, [&]() { 
     how_many_threads_are_asleep = threads_asleep.load(); 
     return how_many_threads_are_asleep == numThreads; 
     }); 
    } 

// At this point I'm sure ALL THREADS ARE ASLEEP! 

// Wake them up one by one (there should only be ONE awake at any time before it finishes his computation) 

for (int i = 0; i < threads_num; i++) 
{ 
    which_thread_to_wake_up.store(i); 
    cv.notify_all(); // (*) Wake them all up to check if they're the chosen one 
    std::unique_lock<std::mutex> lck2(mtx2); 
    cv2.wait(lck, [&]() { return thread_has_finished; }); // Wait for the chosen one to finish 
    thread_has_finished = false; 
} 

恐怕最后notify_all()调用(一个我标有(*))可能会导致以下情况:

  • 所有线程都睡着了
  • 所有线程都从唤醒主线程通过调用notify_all()
  • 具有正确索引的线程完成最后一次计算并释放锁
  • 所有其他线程■找被唤醒,但他们有没有选中原子变量YET
  • 主线程发出第二notify_all(),这都丢失了(因为线程都惊醒的是,他们并没有简单地检查了原子能还)

这会发生吗?我找不到notify_all()的任何措辞,如果它的电话是缓冲或与实际检查条件变量的函数同步的顺序。

+0

我没有仔细研究过你的代码,但是你描述的一般情况是可能的。条件变量不存储信号。只有那些在发送信号时等待条件变量的线程才能看到该信号。由信号唤醒的线程不再等待,即使它们还没有从'wait()'函数返回。 –

回答

0

按上(notify_all

notify_all的文档只有一半的要求,继续一个线程。条件陈述也必须是真实的。所以必须有一个交通警察设计来唤醒第一个,唤醒第二个,唤醒第三个。 notify函数通知线程检查该条件。

我的答案是比特定代码更高的水平,但我希望这有助于。

0

您考虑的情况可能会发生。如果在调用notify_all()时唤醒了工作线程(从属),那么他们可能会错过该信号。

防止这种情况的一种方法是在cv.notify_all()之前锁定mtx,然后解锁。如wait()文档中建议的那样,锁被用作pred()访问的保护。如果主线程获得mtx,则其他线程不会在同一时刻检查条件。虽然他们当时可能正在做其他工作,但在您的代码中,他们不太可能再次输入wait