2012-08-08 57 views
6

我有一个多线程应用程序,创建48个线程,都需要访问一个公共属性(stl :: map)。地图将只在线程启动时写入,剩下的时间将读取地图。这看起来像是pthread_rw_lock的完美用例,并且看起来都很好。pthread_rwlock可同时拥有多少个读者?

我碰到了一个完全不相关的seg-fault并开始分析核心。使用gdb,我执行了命令info threads,对结果感到非常惊讶。我观察到有几个线程实际上是按照预期从映射中读取的,但奇怪的是多个线程在等待rw_lock的pthread_rwlock_rdlock()中被阻塞。

这里是一个线程的堆栈跟踪的是上了锁的等待:

#0 0xffffe430 in __kernel_vsyscall() 
#1 0xf76fe159 in __lll_lock_wait() from /lib/libpthread.so.0 
#2 0xf76fab5d in pthread_rwlock_rdlock() from /lib/libpthread.so.0 
#3 0x0804a81a in DiameterServiceSingleton::getDiameterService(void*)() 

有了这么多的线程,其很难说有多少人阅读,多少被封锁,但我不明白为什么任何考虑到其他线程已经在读取,线程将被阻塞等待读取。

所以,这里是我的问题:为什么有些线程阻塞等待读rw_lock,当其他线程已经从它读取?看起来似乎可以同时读取的线程数量有限制。

我看了看pthread_rwlock_attr_t函数并没有看到任何相关的。

操作系统是Linux,SUSE 11

下面是相关的代码:

{ 
    pthread_rwlock_init(&serviceMapRwLock_, NULL); 
} 

// This method is called for each request processed by the threads 
Service *ServiceSingleton::getService(void *serviceId) 
{ 
    pthread_rwlock_rdlock(&serviceMapRwLock_); 
    ServiceMapType::const_iterator iter = serviceMap_.find(serviceId); 
    bool notFound(iter == serviceMap_.end()); 
    pthread_rwlock_unlock(&serviceMapRwLock_); 

    if(notFound) 
    { 
    return NULL; 
    } 

    return iter->second; 
} 

// This method is only called when the app is starting 
void ServiceSingleton::addService(void *serviceId, Service *service) 
{ 
    pthread_rwlock_wrlock(&serviceMapRwLock_); 
    serviceMap_[serviceId] = service; 
    pthread_rwlock_unlock(&serviceMapRwLock_); 
} 

更新:

如由MarkB的评论中提到,如果我设置pthread_rwlockattr_getkind_np ()优先考虑作家,并且有一位作家被阻止等待,那么观察到的行为就会有意义。但是,我使用我认为优先考虑的读者的默认值。我只是验证了没有线程被阻塞等待写入。我也在@Shahbaz的建议中更新了代码,并获得了相同的结果。

+3

你确定*没有写作者锁定以及? – 2012-08-08 14:20:47

+0

@MarkB这是一个很好的问题!但是,这不依赖于我没有调用过的pthread_rwlockattr_getkind_np()吗?林不知道是否有线程正在等待写入,但他们不应该是因为这应该只在一开始发生。我必须检查。 – Brady 2012-08-08 14:22:23

+0

@MarkB,如果作者正在等待,我还没有设置pthread_rwlockattr_getkind_np()会有什么影响?据我了解,如果有连续的读者,作家可能会饿死,对吧? – Brady 2012-08-08 14:27:13

回答

6

您只是观察了获取锁所涉及的固有性能问题。这需要一些时间,而你恰好在它的中间捕捉到这些线程。当受锁保护的操作持续时间非常短时,情况尤其如此。

编辑:阅读源,glibc使用lll_lock它自己的并行线程库中的数据结构中保护关键部分。 pthread_rwlock_rdlock检查几个标志并增加计数器,所以它在锁定时执行这些操作。一旦完成,锁定将以lll_unlock发布。

为了演示,我实施了一个简短的例程,在获取rwlock后休息。主线程等待它们完成。但在等待之前,它会打印由线程实现的并发性。

enum { CONC = 50 }; 

pthread_rwlock_t rwlock; 
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 
pthread_cond_t cond = PTHREAD_COND_INITIALIZER; 
unsigned count; 

void *routine(void *arg) 
{ 
    int *fds = static_cast<int *>(arg); 
    pthread_rwlock_rdlock(&rwlock); 
    pthread_mutex_lock(&mutex); 
    ++count; 
    if (count == CONC) pthread_cond_signal(&cond); 
    pthread_mutex_unlock(&mutex); 
    sleep(5); 
    pthread_rwlock_unlock(&rwlock); 
    pthread_t self = pthread_self(); 
    write(fds[1], &self, sizeof(self)); 
    return 0; 
} 

并且计数器的主线程等待达到50:

int main() 
{ 
    int fds[2]; 
    pipe(fds); 
    pthread_rwlock_init(&rwlock, 0); 
    pthread_mutex_lock(&mutex); 
    for (int i = 0; i < CONC; i++) { 
     pthread_t tid; 
     pthread_create(&tid, NULL, routine, fds); 
    } 
    while (count < CONC) pthread_cond_wait(&cond, &mutex); 
    pthread_mutex_unlock(&mutex); 
    std::cout << "count: " << count << std::endl; 
    for (int i = 0; i < CONC; i++) { 
     pthread_t tid; 
     read(fds[0], &tid, sizeof(tid)); 
     pthread_join(tid, 0); 
    } 
    pthread_rwlock_destroy(&rwlock); 
    pthread_exit(0); 
} 

编辑:简化使用C++ 11线程支持的示例:

enum { CONC = 1000 }; 
std::vector<std::thread> threads; 

pthread_rwlock_t rwlock; 
std::mutex mutex; 
std::condition_variable cond; 
unsigned count; 

void *routine(int self) 
{ 
    pthread_rwlock_rdlock(&rwlock); 
    { std::unique_lock<std::mutex> lk(mutex); 
     if (++count == CONC) cond.notify_one(); } 
    sleep(5); 
    pthread_rwlock_unlock(&rwlock); 
    return 0; 
} 

int main() 
{ 
    pthread_rwlock_init(&rwlock, 0); 
    { std::unique_lock<std::mutex> lk(mutex); 
     for (int i = 0; i < CONC; i++) { 
      threads.push_back(std::thread(routine, i)); 
     } 
     cond.wait(lk, [](){return count == CONC;}); } 
    std::cout << "count: " << count << std::endl; 
    for (int i = 0; i < CONC; i++) { 
     threads[i].join(); 
    } 
    pthread_rwlock_destroy(&rwlock); 
    pthread_exit(0); 
} 
+0

我用等待读锁的线程的堆栈跟踪来更新我的问题。你的意思是说即使线程在__lll_lock_wait()中,它实际上并没有等待/阻塞,但它在函数中获得了读锁吗?如果是这样,这是一个不幸的函数名称,不会是第一个虽然:) – Brady 2012-08-08 15:42:13

+0

@是的。我没有在我面前确认的消息来源,但我怀疑这与glibc的系统调用入口点对应于内核。 – jxh 2012-08-08 15:57:29

+0

这是有道理的。不,我不相信你:)但我寻找的源代码来验证它。我会在找到它时通知您,谢谢! – Brady 2012-08-08 16:28:49

3

作为注意,上面贴出的代码被打破了。 你不能从rw_lock'd部分访问iter-> second,因为只要你解锁rw_lock,编写者就可以删除地图中的任何元素,从而使其上的任何迭代器失效。

我知道你没有在你的情况下这样做,因为你不会在程序执行开始时写入任何东西,但仍值得一提。另外,作为一个侧面说明,由于你描述的行为看起来像它的序列化(编写者在开始时写入地图,然后读者从现在开始阅读一张“只读”地图),你可能应该写它像这样:

int writerDoneWithMap = 0; 
// pthread_cond & mutex init here 

// The writer write to the map here 

// Then they signal the reader that they are done with it 
while (!__sync_bool_compare_and_swap(&writerDoneWithMap, 1, writerDoneWithMap)); 
pthread_cond_broadcast here 


// The readers will then simply do this: 
while (!writerDoneWithMap) 
{ 
    // pthread_cond_wait here 
} 
// Read the data without locks. 

上面的代码避免对读者的任何锁定,如果作家都填完了地图,并在情况下,他们没有,那么你就求助于典型pthread_cond /互斥工艺。 上面的代码是正确的,当且仅当你有作家那么读者(但这就是你说的),否则它会失败。

相关问题