2010-12-04 56 views
4

归结为一个线程通过某种服务提交作业。作业在一些TPExecutor中执行。之后,此服务会在特定条件下检查结果并在原始线程中抛出异常(作业超过最大重试次数等)。下面的代码片段大致说明了此方案中的遗留代码:如何使用CountDownLatch正确同步/锁定

import java.util.concurrent.CountDownLatch; 

public class IncorrectLockingExample { 

private static class Request { 

    private final CountDownLatch latch = new CountDownLatch(1); 

    private Throwable throwable; 

    public void await() { 
     try { 
      latch.await(); 
     } catch (InterruptedException ignoredForDemoPurposes) { 
     } 
    } 

    public void countDown() { 
     latch.countDown(); 
    } 

    public Throwable getThrowable() { 
     return throwable; 
    } 

    public void setThrowable(Throwable throwable) { 
     this.throwable = throwable; 
    } 

} 

private static final Request wrapper = new Request(); 

public static void main(String[] args) throws InterruptedException { 

    final Thread blockedThread = new Thread() { 
     public void run() { 
      wrapper.await(); 
      synchronized (wrapper) { 
       if (wrapper.getThrowable() != null) 
        throw new RuntimeException(wrapper.getThrowable()); 
      } 
     } 
    }; 

    final Thread workingThread = new Thread() { 
     public void run() { 
      wrapper.setThrowable(new RuntimeException()); 
      wrapper.countDown(); 

     } 
    }; 

    blockedThread.start(); 
    workingThread.start(); 

    blockedThread.join(); 
    workingThread.join(); 
} 

}

有时,(在我的箱子不可复制的,但发生在16核服务器箱)异常没有得到报告给原来的线程。我认为这是因为发生 - 之前不是强迫的(例如'countDown'发生在'setThrowable'之前)并且程序继续工作(但应该失败)。 我将不胜感激关于如何解决这种情况的任何帮助。 约束条件是:一周内发布,需要对现有代码库的影响最小。

+0

250 KLOC项目在这里完全是多线程的,工作在16核心等。我们使用像“CountDownLatch”*“lot **”这样的“高级”多线程工具。我们使用低级别事物的次数,例如* Object *的wait *方法和* Thread *的join *方法? **零**。在我看来,现在在默认API中有足够多的高级并发功能,您不需要重新创建基于Java idiosynchrasies的任何碎轮。 +1给Peter Lawrey的答案。 – SyntaxT3rr0r 2010-12-04 13:18:36

+0

@ Webinator:OP *在此处使用“高级”CountDownLatch工具来实现其设计目的之一。 – willjcroz 2010-12-04 13:38:38

+0

您确定上面的代码不符合预期吗?我认为没有理由不纠正之后。 – willjcroz 2010-12-04 13:46:04

回答

6

上面的代码(现在已更新)应该按照您的预期工作,而不使用进一步的同步机制。通过使用CountDownLatchawait()countdown()方法来实施存储屏障及其相应的'发生在'之前的关系。

API docs:一个成功的“获取”方法,例如随后的

操作之前为“释放”同步器的方法,如Lock.unlock,Semaphore.release,和CountDownLatch.countDown发生-前行动如另一个线程中的同一个同步器对象上的Lock.lock,Semaphore.acquire,Condition.await和CountDownLatch.await。

如果您定期处理并发让自己的'Java Concurrency in Practice'副本,它是Java并发圣经,将是值得它的重量你的书架上:-)。

2

我怀疑你需要

private volatile Throwable throwable 

您是否尝试过使用,因为它是内置的,这是否给你一个ExecutorService。下面的打印

future1 := result 
future2 threw java.lang.IllegalStateException 
future3 timed out 

的代码是

public static void main(String... args) { 
    ExecutorService executor = Executors.newSingleThreadExecutor(); 
    Future<String> future1 = executor.submit(new Callable<String>() { 
     public String call() throws Exception { 
      return "result"; 
     } 
    }); 

    Future<String> future2 = executor.submit(new Callable<String>() { 
     public String call() throws Exception { 
      throw new IllegalStateException(); 
     } 
    }); 

    Future<String> future3 = executor.submit(new Callable<String>() { 
     public String call() throws Exception { 
      Thread.sleep(2000); 
      throw new AssertionError(); 
     } 
    }); 

    printResult("future1", future1); 
    printResult("future2", future2); 
    printResult("future3", future3); 
    executor.shutdown(); 
} 

private static void printResult(String description, Future<String> future) { 
    try { 
     System.out.println(description+" := "+future.get(1, TimeUnit.SECONDS)); 
    } catch (InterruptedException e) { 
     System.out.println(description+" interrupted"); 
    } catch (ExecutionException e) { 
     System.out.println(description+" threw "+e.getCause()); 
    } catch (TimeoutException e) { 
     System.out.println(description+" timed out"); 
    } 
} 

在为FutureTask的代码,有注释。

/** 
* The thread running task. When nulled after set/cancel, this 
* indicates that the results are accessible. Must be 
* volatile, to ensure visibility upon completion. 
*/ 

如果您不打算重新使用在JDK的代码,它仍然是值得一读,这样你可以在他们使用任何技巧回升。