1

我有一个班级,其中有一个ConcurrentHashMap,每30秒更新一个线程,然后通过调用getNextSocket()方法从同一个ConcurrentHashMap读取多个阅读器线程。从单个线程填充ConcurrentHashMap,然后从多个线程读取而没有任何竞争条件?

以下是我的单例类,它在初始化时调用connectToSockets()方法来填充我的ConcurrentHashMap,然后启动一个后台线程,通过调用updateSockets()方法每隔30秒更新一次相同的地图。

然后从多个线程我打电话getNextSocket()方法获得下一个可用的活套接字使用相同的地图来获取信息。我也有SocketInfo类是不可变的,它包含所有套接字的状态,不管它们是否存在。

public class SocketHolder { 
    private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); 
    private final Map<DatacenterEnum, List<SocketInfo>> liveSocketsByDc = new ConcurrentHashMap<>(); 

    // Lazy Loaded Singleton Pattern 
    private static class Holder { 
    private static final SocketHolder INSTANCE = new SocketHolder(); 
    } 

    public static SocketHolder getInstance() { 
    return Holder.INSTANCE; 
    } 

    private SocketHolder() { 
    connectToSockets(); 
    scheduler.scheduleAtFixedRate(new Runnable() { 
     public void run() { 
     updateSockets(); 
     } 
    }, 30, 30, TimeUnit.SECONDS); 
    } 

    private void connectToSockets() { 
    Map<DatacenterEnum, ImmutableList<String>> socketsByDc = TestUtils.SERVERS; 
    for (Map.Entry<DatacenterEnum, ImmutableList<String>> entry : socketsByDc.entrySet()) { 
     List<SocketInfo> addedColoSockets = connect(entry.getKey(), entry.getValue(), ZMQ.PUSH); 
     liveSocketsByDc.put(entry.getKey(), addedColoSockets); 
    } 
    } 

    private List<SocketInfo> connect(DatacenterEnum dc, List<String> addresses, int socketType) { 
    List<SocketInfo> socketList = new ArrayList<>(); 
    // ... some code here 
    return socketList; 
    } 

    // called from multiple reader threads to get next live available socket 
    public Optional<SocketInfo> getNextSocket() { 
    Optional<SocketInfo> liveSocket = getLiveSocket(liveSocketsByDc.get(DatacenterEnum.CORP)); 
    return liveSocket; 
    } 

    private Optional<SocketInfo> getLiveSocket(final List<SocketInfo> listOfEndPoints) { 
    if (!CollectionUtils.isEmpty(listOfEndPoints)) { 
     Collections.shuffle(listOfEndPoints); 
     for (SocketInfo obj : listOfEndPoints) { 
     if (obj.isLive()) { 
      return Optional.of(obj); 
     } 
     } 
    } 
    return Optional.absent(); 
    } 

    // update CHM map every 30 seconds 
    private void updateSockets() { 
    Map<DatacenterEnum, ImmutableList<String>> socketsByDc = TestUtils.SERVERS; 

    for (Entry<DatacenterEnum, ImmutableList<String>> entry : socketsByDc.entrySet()) { 
     List<SocketInfo> liveSockets = liveSocketsByDc.get(entry.getKey()); 
     List<SocketInfo> liveUpdatedSockets = new ArrayList<>(); 
     for (SocketInfo liveSocket : liveSockets) { 
     Socket socket = liveSocket.getSocket(); 
     String endpoint = liveSocket.getEndpoint(); 

     boolean sent = ....; 

     boolean isLive = sent ? true : false; 

     // is this right here? or will it cause any race condition? 
     SocketInfo state = new SocketInfo(socket, liveSocket.getContext(), endpoint, isLive); 
     liveUpdatedSockets.add(state); 
     } 
     // update map with new liveUpdatedSockets 
     liveSocketsByDc.put(entry.getKey(), liveUpdatedSockets); 
    } 
    } 
} 

问:

是我上面的代码线程安全的,有在updateSockets()getNextSocket()方法的竞争状态?

在我updateSockets()的方法,我提取liveSocketsByDc的ConcurrentHashMap List<SocketInfo>这是之前已经在connectToSockets()方法初始化或30秒的下一个时间间隔内updateSockets()方法填充,然后我遍历同一个列表liveSockets并根据创建一个新的SocketInfo对象不管isLive是真是假。然后我用这个新的SocketInfo对象更新liveSocketsByDc ConcurrentHashMap。这看起来正确吗?由于来自多个读者线程,我打算拨打getNextSocket()方法,它会调用getLiveSocket方法,该方法使用相同的地图获取下一个可用的活动套接字。

我在迭代liveSockets列表,然后创建一个新的SocketInfo对象,只需更改isLive字段,其​​他内容将保持不变。这是正确的吗?

如果存在线程安全问题,解决此问题的最佳方法是什么?

+0

不,它不是线程安全的。我发现的第一个违规行为(没有更进一步)是每一次读取都会将一个共享的ArrayList从地图中取出并对其进行洗牌。 –

+0

这个问题可能适合我们的姊妹网站[代码评论](http://codereview.stackexchange.com/)比它在这里。 –

+0

@JBNizet我明白了,我们如何解决这个问题? – user1234

回答

1

这里:

List<SocketInfo> liveSockets = liveSocketsByDc.get(entry.getKey()); 

你不同的线程中可能写入/读取的相同列表对象并行。

所以:不是线程安全的。它没有帮助有一个“外部”线程安全的数据结构;当那个线程安全的东西包含数据...这不是线程安全的;但后来“并行”!

+0

在我的代码中修复这个线程安全问题的最好方法是什么? – user1234