2014-10-03 70 views
2

我有一个线程池执行者做同样的操作分批未来密钥列表()。所以我使用invokeall()方法来处理批次中的键列表。该用例是这样的,如果批处理中的任何任务返回错误,则没有必要继续处理其他密钥。因此,的ThreadPoolExecutor:从取消的invokeAll任务时,一个任务返回错误

  1. 我该如何取消批处理执行任务,一旦任务重新调用错误。
  2. 但不影响另一批按键执行。即每批应该取消注销。

感谢您的帮助。

回答

1

我看不出这可怎么没有一点定制来完成。我能想出的最简单的实现需要:

  • 一个专门的未来实现基本的FutureTask子类,覆盖setException()方法以取消所有其他任务时,任务抛出一个异常
  • 专门ThreadPoolExecutor实施将覆盖的invokeAll()尽量使用自定义的未来

的它是这样的:

自定义未来:

import java.util.Collection; 
import java.util.concurrent.*; 

public class MyFutureTask<V> extends FutureTask<V> { 
    private Callable<V> task; 
    private Collection<Future<V>> allFutures; 

    public MyFutureTask(Callable<V> task, Collection<Future<V>> allFutures) { 
    super(task); 
    this.task = task; 
    this.allFutures = allFutures; 
    } 

    @Override 
    protected void setException(Throwable t) { 
    super.setException(t); 
    synchronized(allFutures) { 
     for (Future<V> future: allFutures) { 
     if ((future != this) && !future.isDone()) { 
      future.cancel(true); 
     } 
     } 
    } 
    } 
} 

自定义线程池:

import java.util.*; 
import java.util.concurrent.*; 

public class MyThreadPool extends ThreadPoolExecutor { 
    public MyThreadPool(int size) { 
    super(size, size, 1L, TimeUnit.MILLISECONDS, 
     new LinkedBlockingQueue<Runnable>()); 
    } 

    @Override 
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 
    throws InterruptedException { 
    List<Future<T>> futures = new ArrayList<>(tasks.size()); 
    for (Callable<T> callable: tasks) { 
     futures.add(new MyFutureTask<>(callable, futures)); 
    } 
    for (Future<T> future: futures) { 
     execute((MyFutureTask<T>) future); 
    } 
    for (Future<T> future: futures) { 
     try { 
     future.get(); 
     } catch (ExecutionException|CancellationException e) { 
     // ignore this exception 
     } 
    } 
    return futures; 
    } 
} 

代码示例来测试它:

import java.util.*; 
import java.util.concurrent.*; 

public class TestThreadPool { 
    public static void main(final String[] args) { 
    ExecutorService executor = null; 
    try { 
     int size = 10; 
     executor = new MyThreadPool(size); 
     List<Callable<String>> tasks = new ArrayList<>(); 
     int count=1; 
     tasks.add(new MyCallable(count++, false)); 
     tasks.add(new MyCallable(count++, true)); 
     List<Future<String>> futures = executor.invokeAll(tasks); 
     System.out.println("results:"); 
     for (int i=0; i<futures.size(); i++) { 
     Future<String> f = futures.get(i); 
     try { 
      System.out.println(f.get()); 
     } catch (CancellationException e) { 
      System.out.println("CancellationException for task " + (i+1) + 
      ": " + e.getMessage()); 
     } catch (ExecutionException e) { 
      System.out.println("ExecutionException for task " + (i+1) + 
      ": " + e.getMessage()); 
     } 
     } 
    } catch(Exception e) { 
     e.printStackTrace(); 
    } finally { 
     if (executor != null) executor.shutdownNow(); 
    } 
    } 

    public static class MyCallable implements Callable<String> { 
    private final int index; 
    private final boolean simulateFailure; 

    public MyCallable(int index, boolean simulateFailure) { 
     this.index = index; 
     this.simulateFailure = simulateFailure; 
    } 

    @Override 
    public String call() throws Exception { 
     if (simulateFailure) { 
     throw new Exception("task " + index + " simulated failure"); 
     } 
     Thread.sleep(2000L); 
     return "task " + index + " succesful"; 
    } 
    } 
} 

和执行测试的最后结果,正如显示在输出控制台:

results: 
CancellationException for task 1: null 
ExecutionException for task 2: java.lang.Exception: task 2 simulated failure 
0
  1. 传递ExecutorService的参考每个任务波纹管:

    ExecutorService eServ = Executors.newFixedThreadPool(10); 
    
    Set<Callable<ReaderThread>> tasks = new HashSet<Callable<ReaderThread>>(); 
    
    for (int i = 0; i < 10 ; i++) 
    { 
        tasks.add(new ReaderThread(eServ)); 
    } 
    
    List<Future<ReaderThread>> lt = eServ.invokeAll(tasks); 
    
  2. 如果任务是错误然后调用shutdownNow()那么将停止所有的任务

    public ReaderThread call() throws Exception 
    { 
        try  
        { 
         for (int i = 1; i < 50; i++) 
         { 
          System.out.println("i="+i+"::"+Thread.currentThread()); 
          Thread.sleep(1000); 
    
          if (i == 10 && Thread.currentThread().toString().equals("Thread[pool-1-thread-7,5,main]")) 
          { 
           throw new Exception(); 
          }   
         } 
        } 
        catch (Exception exc) 
        { 
         ex.shutdownNow();  
        } 
    
        return this; 
    }