2011-11-29 38 views
4

我通过交互式会话+在独立线程上运行的私有源(InputStream)连接到外部服务。在交互式会话中,我发送传出消息并接收包含不同字段的对象的同步响应,其中一个是ID和“状态”,以确认成功或失败。同时,我会收到有关此ID的私人Feed的消息,并进一步提供“状态”更新。我目前在一个ConcurrentHashMap中存储关于每个ID的状态的信息。我必须在这些对象上保留正确的事件序列,但目前我正在获得竞争条件,在我接收和处理交互式会话上的同步响应之前,有时我会处理和更新专用Feed上的对象,因此让我与ID的过时和不正确的状态。Java在并发中集合/获取集合

理想情况下,我希望拥有某种类型的带PutIfKeyExistOrWait(w超时)方法的集合,只有在密钥存在或等待时才会更新该值,以便在处理私有订阅源上的对象时使用。

有谁知道是否有合适的集合可用或可以建议我的问题的替代解决方案?谢谢。

+0

如果稍后(当ID已存在时),PutIfKeyExistOrWait将无法帮助您收到2个异步通知,但由于竞争条件而以错误的顺序处理它们。这听起来像唯一的防弹方法是,如果传入的消息具有序列号。如果他们不这样做,我想下一个最好的办法是在每封邮件中附上收到邮件时的时间戳,然后根据时间戳对状态进行排序。 –

+0

@EliAcherkan谢谢。私人订阅源上的异步消息是在单个线程中接收的,所以我可以'保证'这些消息的正确顺序。只有同步响应是导致问题的异步消息。时间戳也不起作用。这些消息通常在同一毫秒内接收,并且不能保证在私人订阅源上的消息之前收到同步响应,因此存在问题。 – hgus1294

+0

对不起,我能想到的唯一选择是交互式会话具有一个“同步”块,用于检查密钥是否已通过私人订阅源插入到地图中,并相应地插入/更新状态。 –

回答

0

您已经有一些ConcurrentHashMap iDAndStatus存储ID和最新状态。但是,我只会让处理该服务的线程在该地图中创建一个新条目。

当消息从Feed中到达时,如果ID已存在于iDAndStatus中,它只是修改状态。如果密钥不存在,只需将ID /状态更新临时存储在其他数据结构pendingFeedUpdates中。

每次在iDAndStatus中创建新条目时,请检查pendingFeedUpdates以查看是否存在新ID的某些更新。

我不确定用于pendingFeedUpdates的同步数据结构:您需要通过ID检索,但每个ID可能有许多消息,并且您希望保留消息的顺序。也许一个同步的HashMap将每个ID与某种类型的同步排序队列关联起来?

+0

谢谢。我正在用你的建议版本。当我在交互式会话上处理同步消息时,我检查是否已经从私人订阅源获取相同ID的状态更新。在这种情况下,我简单地忽略同步响应并且不更新状态。我将所有邮件存储在单独的收藏集中以保留订单线索,但我可以在列表中轻微混乱地生活。感谢所有伟大的意见和答案。 – hgus1294

0

我建议你看一下Collections.getSynchronized集合:http://docs.oracle.com/javase/1.4.2/docs/api/java/util/Collections.html#synchronizedList%28java.util。列表%29

这可能可能解决您的问题另一个选项取决于如何调用该方法是一种同步方法,它允许线程安全执行并确保事务的原子性。请参阅http://docs.oracle.com/javase/tutorial/essential/concurrency/syncmeth.html

第三种选择是根据您尝试实现的内容,在乐观或悲观方法之后,在应用程序内强制实施并发管理控制。这是3中最复杂的一个,但如果再加上以前的选项,会给你更大的控制权。

这实际上取决于您的具体实施。

+0

谢谢。我不知道普通的同步或锁定会如何解决我的问题。我仍然会遇到种族情况,在同步响应之前,私人Feed上的消息可能会获得锁定。我应该指出,在交互式会话中发送请求之前,我不知道该ID,否则我可能已锁定与该ID对应的密钥,直到我收到并处理了同步响应。 – hgus1294

+0

而不是锁定记录,您可以通过检测冲突并分别处理这些冲突来尝试乐观锁定。http://www.agiledata.org/essays/concurrencyControl.html#OptimisticLocking有几种方法可以检测并发冲突我的个人偏好增量行版本。因此,如果行版本与您的数据库中存在冲突的行数不相等,交互将发回ID和行版本,这将允许您单独处理它。 – Stainedart

2

你可以尝试封装逻辑处理这种情况到您的地图,像这样的价值观:

  • 如果给的螺纹是第一个加入了特定的ID值,该值被认为是不完整的,线程等待,直到它完成
  • 如果交互式会话线程不是第一次添加一个值,它标志着不完全值作为完全从地图
让它们时
  • 不完整的值将被视为缺席

    该解决方案基于putIfAbsent()的原子性。

    public class StatusMap { 
        private Map<Long, StatusHolder> map = new ConcurrentHashMap<Long, StatusHolder>(); 
    
        public Status getStatus(long id) { 
         StatusHolder holder = map.get(id); 
         if (holder == null || holder.isIncomplete()) { 
          return null; 
         } else { 
          return holder.getStatus(); 
         } 
        } 
    
        public void newStatusFromInteractiveSession(long id, Status status) { 
         StatusHolder holder = StatusHolder.newComplete(status); 
         if ((holder = map.putIfAbsent(id, holder)) != null) { 
          holder.makeComplete(status); // Holder already exists, complete it 
         } 
        } 
    
        public void newStatusFromFeed(long id, Status status) { 
         StatusHolder incomplete = StatusHolder.newIncomplete(); 
         StatusHolder holder = null; 
         if ((holder = map.putIfAbsent(id, incomplete)) == null) { 
          holder = incomplete; // New holder added, wait for its completion 
          holder.waitForCompletion(); 
         } 
         holder.updateStatus(status); 
        } 
    } 
    
    public class StatusHolder { 
        private volatile Status status; 
        private volatile boolean incomplete; 
        private Object lock = new Object(); 
    
        private StatusHolder(Status status, boolean incomplete) { ... } 
    
        public static StatusHolder newComplete(Status status) { 
         return new StatusHolder(status, false); 
        } 
    
        public static StatusHolder newIncomplete() { 
         return new StatusHolder(null, true); 
        } 
    
        public boolean isIncomplete() { return incomplete; } 
    
        public void makeComplete(Status status) { 
         synchronized (lock) { 
          this.status = status; 
          incomplete = false; 
          lock.notifyAll(); 
         } 
        } 
    
        public void waitForCompletion() { 
         synchronized (lock) { 
          while (incomplete) lock.wait(); 
         } 
        } 
        ... 
    } 
    
  • +0

    谢谢。 +1的雄心勃勃的答案。你的建议是沿着我目前正在做的一个解决方法,我尝试根据源和状态来测试序列。该映射涉及一些相当棘手的情况,例如,在某些情况下,私人信息饲料没有更新,所以我想看看是否有我忽略的更简洁更清洁的解决方案。 – hgus1294