2011-06-14 38 views
2

我想执行多个可并行调用对象。但似乎ExecutorService总是等待,直到所有可调用的命令都完成。并行执行可调对象

我已经试过如下:

final int nThreads = 10; 
ExecutorService executorService = Executors.newFixedThreadPool(nThreads); 
List<PrimeCallable> tasks = new ArrayList<PrimeCallable>(); 
for(int i = 0; i < nThreads; i++) { 
    tasks.add(new PrimeCallable(0, i * 100 + 100, "thread" + i)); 
} 

try { 
    for(Future<List<Integer>> result : executorService.invokeAll(tasks)) { 
     List<Integer> integers = result.get(); 
     for(Integer i : integers){ 
      System.out.println(i); 
     } 
    } 
} catch (InterruptedException e) { 
    // TODO Auto-generated catch block 
    e.printStackTrace(); 
} catch (ExecutionException e) { 
    // TODO Auto-generated catch block 
    e.printStackTrace(); 
} 

现在,当所有ExecutorService的可调用的finnished for循环被调用。据我所知,没有executorService.isParallel setter ;-)。

什么是正确的方法让可调参数平行运行?

感谢您的提示!

回答

7

javadocs invokeAll说;

执行给定的任务,返回保持状态 期货的 列表和结果当所有完整。 Future.isDone()对于返回列表的每个元素都是true。

因此invokeAll阻塞,直到集合中的每个任务完成。

5

执行程序服务并行运行所有可调用程序。它所做的只是在等待所有并行任务完成之前完成。所以它不像所有的任务都以串行方式运行。

2

如果您想查看结果,请使用ExecutorCompletionService

3

这听起来像你想要的一部分是懒惰执行 - 你不想在提取结果之前制作内存中结构的副本。

我会认为这是一个迭代+转换问题。首先,在你的输入中定义一个迭代器,这样每次next()调用都会返回一个Callable,它将产生你的系列中的下一个值。

变换级是应用这些可调用的并行或并发的评估,像这样(未测试):

public class ConcurrentTransform 
{ 
    private final ExecutorService executor; 
    private final int maxBuffer; 

    public ConcurrentTransform(ExecutorService executor, int maxWorkBuffer) { 
    this.executor = executor; 
    this.maxBuffer = Math.max(1, maxWorkBuffer); 
    } 

    public <T> Iterator<T> apply(final Iterator<Callable<T>> input) { 
    // track submitted work 
    final BlockingQueue<Future<T>> submitted = new LinkedBlockingQueue<Future<T>>(); 

    // submit first N tasks 
    for (int i=0; i<maxBuffer && input.hasNext(); i++) { 
     Callable<T> task = input.next(); 
     Future<T> future = executor.submit(task); 
     submitted.add(future); 
    } 

    return new Iterator<T>(){ 
     @Override 
     public synchronized boolean hasNext() { 
     return !submitted.isEmpty(); 
     } 
     @Override 
     public T next() { 
     Future<T> result; 
     synchronized (this) { 
      result = submitted.poll(); 
      if (input.hasNext()) { 
      submitted.add(executor.submit(input.next())); 
      } 
     } 

     if (result != null) { 
      try { 
      return result.get(); // blocking 
      } catch (Exception e) { 
      if (e instanceof RuntimeException) { 
       throw (RuntimeException) e; 
      } else { 
       throw new RuntimeException(e); 
      } 
      } 
     } else { 
      throw new NoSuchElementException(); 
     } 
     } 
     @Override 
     public void remove() { 
     throw new UnsupportedOperationException(); 
     }}; 
    } 
} 

打电话申请(......)之后,你会遍历产生值,它们将在并行执行Callable对象并以与输入相同的顺序返回结果。一些改进是允许阻塞result.get()调用的可选超时,或者在变换本身内管理线程池。