2014-10-21 102 views
0

我想实现一个线程安全的队列映射。线程安全队列映射

我打算从空地图开始。如果该键不存在,我想用新的队列创建一个新的Map条目。如果密钥确实存在,我想添加到队列中。我的建议的实施情况如下:

import java.util.Map; 
import java.util.concurrent.ConcurrentHashMap; 
import java.util.concurrent.ConcurrentLinkedQueue; 

public class StackOverFlowExample { 

    private final Map<String, ConcurrentLinkedQueue<String>> map = new ConcurrentHashMap<>(); 

    public void addElementToQueue(String key, String value){ 
     if (map.containsKey(key)){ 
      map.get(key).add(value); 
     } 
     else{ 
      ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>(); 
      queue.add(value); 
      map.put(key, queue); 
     }   
    }  
} 

我担心的是,当多个线程试图将一个新值添加到地图时,首先会放置一个新的队列中的新地图项,第二等待,然后为该密钥添加一个新的队列,而不是添加到队列中。我的并发/并发API知识很少。或许并发性是为了避免这种情况?建议将不胜感激。

回答

2

这种模式可能已经发布多次对SO(有效地增加了并发图):

Queue<String> q = map.get(key); 
if(q == null) { 
    q = new ConcurrentLinkedQueue<String>(); 
    Queue<String> curQ = map.putIfAbsent(key, q); 
    if(curQ != null) { 
    q = curQ; 
    } 
} 
q.add(value); 
0

所以你的担心是,线程A和线程B将执行以下操作:

thread A: lock ConcurrentHashMap Look for Queue "x" (not found) unlock ConcurrentHashMap create Queue "x" lock ConcurrentHashMap Insert Queue X unlock ConcurrentHashMap Thread B: Lock ConcurrentHashMap (while thread A is in 'create Queue X') look for queue X (not found) unlock ConcurrentHashMap (thread A then gets lock) create Queue "x" v2 lock ConcurrentHashMap Insert Queue X v2 (overwriting the old entry) unlock ConcurrentHashMap

这事实上是一个真正的问题,而且是一个很容易做出AddElementToQueue是一个同步的方法解决。然后,在任何给定时间内,AddElementToQueue中只能有一个线程,因此第一个“解锁”和第二个“锁定”之间的同步孔关闭。

因此

public synchronized void addElementToQueue(String key, String value){

应该解决您丢失的队列问题。

+0

那会明智的,如果一个ConcurrentHashMap在检索时使用锁定。它没有。 – spudone 2014-10-21 22:56:06

+0

啊对,ConcurrentHashMap“仅仅”使用了多读者弹性的数据结构。锁定只能由作家在从地图添加或删除元素所需的短暂时间内使用。我一直忘记这个实现细节,因为从程序的角度来看,这并不重要。这个问题仍然发生在同一个地方(同时“X存在于HashMap?”与“创建新HashMap”同时)并且解决方案是相同的 - 同步addElementToQueue方法。 – 2014-10-21 23:33:34

+0

使用synchronized块,他并不需要并发数据结构,除非代码更多。 – spudone 2014-10-21 23:41:53

0

如果你的Java 8是一个选项:

public void addElementToQueue(String key, String value) { 
    map.merge(key, new ConcurrentLinkedQueue<>(Arrays.asList(value)), (oldValue, coming) -> { 
     oldValue.addAll(coming); 
     return oldValue; 
    }); 
}