2010-07-31 123 views
3

我有一个底层的异步无序查询/响应系统,我想同步。查询和响应可以通过用唯一ID标记查询来映射,这些ID随后将伴随相应的响应。使异步查询同步

我试图使它同步使用两个ConcurrentHashMap s:一个从ID映射到结果,一个从相同ID映射到CountDownLatch es。

public Result execute(Query query) throws InterruptedException { 
    int id = atomicInteger.incrementAndGet(); 
    CountDownLatch latch = new CountDownLatch(1); 
    latchMap.put(id, latch); 
    query.executeAsyncWithId(id); // Probably returns before result is ready 
    try { 
     latch.await(); // Blocks until result is ready 
     return resultMap.remove(id); 
    } catch (InterruptedException e) { 
     latchMap.remove(id); // We are not waiting anymore 
     throw e; 
    } 
} 

以及处理传入的结果代码:执行查询时,代码是这样

public void handleResult(Result result) { 
    int id = result.getId(); 
    CountDownLatch latch = latchMap.remove(id); 
    if (latch == null) { 
     return; // Nobody wants result 
    } 
    resultMap.put(id, result); 
    latch.countDown(); 
} 

这种方法是从一个线程读取从底层系统的所有传入的结果称为(只有一个这样的阅读器线程)。

首先,我不确定线程​​安全性,但似乎没有必要为此使用两个HashMap(特别是因为ID从不重复使用)。任何改进想法?

+0

因此,删除使用“闩锁”的代码,但不应在此函数内定义。我没有看到任何会使这种异步的东西。 – 2010-07-31 21:07:30

+0

它是实际执行异步查询的底层系统。我已经将该方法重命名为“executeAsyncWithId”来澄清。 – hakos 2010-07-31 21:14:31

+0

嘿,我建议你发布自己的答案作为答案,并接受! – 2010-08-01 07:01:32

回答

4

通过this answer to a similar question灵感的新尝试:

public class ResultFuture { 

    private volatile Result result = null; 
    private final CountDownLatch latch = new CountDownLatch(1); 

    public Result get() throws InterruptedException { 
     latch.await(); 
     return result; 
    } 

    public void set(Result result) { 
     this.result = result; 
     latch.countDown(); 
    } 
} 

现在我只需要一个HashMap这些ResultFuture S的:

public Result execute(Query query) throws InterruptedException { 
    int id = atomicInteger.incrementAndGet(); 
    ResultFuture resultFuture = new ResultFuture(); 
    resultFutureMap.put(id, resultFuture); 
    query.executeAsyncWithId(id); // Probably returns before result is ready 
    try { 
     return resultFuture.get(); // Blocks until result is ready 
    } finally { 
     resultFutureMap.remove(id); 
    } 
} 

public void handleResult(Result result) { 
    int id = result.getId(); 
    ResultFuture resultFuture = resultFutureMap.get(id); 
    if (resultFuture == null) { 
     return; // Nobody wants result 
    } 
    resultFuture.set(result); 
} 
+0

这个答案应该被删除,并把文本放在原来的问题作为更新。 – 2010-07-31 22:29:52

+0

@詹姆斯布莱克,这是完全可以接受的问和回答你自己的问题。 @hakos至少应该接受他自己的回答:) – 2010-08-02 05:07:26

+0

@Tim Bender - 好了,仔细看后,我可以看出这是一个很好的答案,我最初认为这只是对原始示例的修改,只有一个HashMap。 – 2010-08-02 09:33:52

0

ConcurrentHashMap使用意味着你应该相信这一点被等被定义为原子在文档的方法:

  • putIfAbsent(K key, V val)
  • replace(K key, V val)
  • remove(K key, V val)

所以如果你打算保持他们你应该改变你的用法,这至少可以保证你的hashmaps的线程安全。

除了为每个请求创建一个新的Executor之外,它还会返回结果本身,以便线程在继续工作前等待它的完成:以这种方式,您将以同步的方式完成整个事件。 。

0

我认为CountDownLatch的版本可能是最好的解决方案。 “实践中的Java并发性”(Brian Goetz)实际上谈到了这一点(我认为他称之为价值锁存)。它基本上是一个设置值的一次性同步机制。

尽管可以通过wait() - notifyAll()机制实现这样的值锁存,但使用CountDownLatch会产生更简单的实现。

CountDownLatch await() - countDown()方法提供了一个恰当的happen-before关系。