2013-03-06 116 views
1

所以事情是我有一个对象,做一个计算密集型任务。如何使用线程池并重新加入java中的线程?

假设主类有两个这样的对象,我按以下方式对该过程进行了并行化。

Thread t1 = new Thread(new Runnable() { 
      @Override 
      public void run() { 
        obj1.compute(); 
      } 
     }); 
Thread t2 = new Thread(new Runnable() { 
      @Override 
      public void run() { 
        obj2.compute(); 
      } 
     }); 

     t1.start(); 
     t2.start(); 
     t1.join(); 
     t2.join(); 

但是,如果我有任意数量的对象,存储在列表中,那么最好的方法是什么? (假定数量为10)。我正在寻找执行者服务,但不知道如何重新加入线程。 ForkJoinPool应该这样做,但我找不到如何使其工作。

我的优先级是简单,清晰的代码,而不是性能(因为任何开销在计算需要10分钟内看不见)。

回答

4

您可以简单地尝试在退回的期货上拨打get,这将阻止任务完成。例如:

ExecutorService executor = Executors.newFixedThreadPool(10); 
List<Future> futures = new ArrayList<>(); 

for (Runnable r : yourListOfRunnables) { 
    futures.add(executor.submit(r)); 
} 

//now do the equivalent of join: 

try { 
    for (Future f : futures) { 
     f.get(); //blocks until the runnable completes 
    } 
} catch (...) { } 

注:当你完成别忘了shutdown执行人或它可能会阻止应用程序退出。

或者,如果这是一个一次性的事情,你可以shutdown执行程序和等待,直到它被终止:

ExecutorService executor = Executors.newFixedThreadPool(10); 

for (Runnable r : yourListOfRunnables) { 
    executor.submit(r); 
} 

executor.shutdown(); //do not accept more tasks 
executor.awaitTermination(Long.MAX_VALUE, SECONDS); //waits until all tasks complete 

//at this point: all tasks have completed and 
//your executor is terminated: you can't reuse it 
+0

不是吗? – 2013-03-06 19:47:12

0

在本教程请看:Java Thread Pool Example using Executors and ThreadPoolExecutorExecutorService的文档。如果你犯了一些可运行的任务,因为你做以上,然后将它们添加到列表中,你应该能够同时给他们所有的执行者,并让他们在一个集合中的所有回过:

tasks = new HashSet<Runnable>(); 
    for (int i = 0; i < 10; i++) { 
     Runnable worker = new WorkerThread('' + i); 
     tasks.add(worker); 
     } 

    //start processing and get a list of all the futures 
    List<Future> futures = executor.invokeAll(tasks); 
    for (Future f: futures) { 
     f.get(); 
    } 

列表中的第一个未来可能不是第一个将来完成的,但延迟获取价值不应该让事情减慢太多。

1

的另一种方式,仍然使用java.util.concurrent

import java.util.concurrent.CountDownLatch; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.TimeUnit; 

public class ForkJoin { 
    static final int THREAD_COUNT = 10; 
    public static void main(String[] args) throws InterruptedException { 
     ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT); 
     final CountDownLatch cdl = new CountDownLatch(THREAD_COUNT); 
     for(int i = 0; i < THREAD_COUNT; ++i) { 
     executor.execute(new Runnable(){ 
      @Override public void run(){ 
       try { 
        Thread.sleep((long)(2000.0 + 1000.0*Math.random())); 
        System.err.println("Done."); 
        cdl.countDown(); 
       } 
       catch(InterruptedException e) { 
        e.printStackTrace(); 
       } 
      }}); 
     } 
     cdl.await(1, TimeUnit.DAYS); 
     executor.shutdownNow(); 
     System.err.println("All done."); 
    } 
} 

输出:你觉得爱情

Done. 
Done. 
Done. 
Done. 
Done. 
Done. 
Done. 
Done. 
Done. 
Done. 
All done.