2009-11-27 98 views
5

我已经从java.util.concurrent扩展FutureTask来提供回调以跟踪提交给ExecutorService的任务的执行情况。现在扩展FutureTask,如何处理取消

public class StatusTask<V> extends FutureTask<V> { 

    private final ITaskStatusHandler<V> statusHandler; 

    public StatusTask(Callable<V> callable, ITaskStatusHandler<V> statusHandler){ 
     super(callable); 
     if (statusHandler == null) 
      throw new NullPointerException("statusHandler cannot be null"); 
     this.statusHandler = statusHandler; 
     statusHandler.TaskCreated(this); 
    } 

    @Override 
    public void run() { 
     statusHandler.TaskRunning(this); 
     super.run(); 
    } 

    @Override 
    protected void done() { 
     super.done(); 
     statusHandler.TaskCompleted(this); 
    } 

} 

,我所看到的是,如果该任务被提交,但最终排队,我cancel(true);的任务 - run()方法仍然被调用 - 和FutureTask.run()(可能)会检查任务被取消,并没有按不打电话给包装的可回收物品。

我应该如

@Override 
public void run() { 
    if(!isCancelled()) { 
    statusHandler.TaskRunning(this); 
    super.run(); 
    } 
} 

还是应该叫super.run()?这两种方法似乎都容易受到检查取消和做某件事情之间的竞争条件的影响..任何想法都会受到赞赏。

回答

3

你说得对,有一个比赛中出现。 FutureTask#done()最多会被称为一次,因此如果任务在通过RunnableFuture#run()运行之前已被取消,您将错过FutureTask#done()的呼叫。

你有没有考虑过一种更简单的方法,总是发出一组对称呼叫到ITaskStatusHandler#taskRunning()ITaskStatusHandler#taskCompleted(),像这样?

@Override 
public void run() { 
    statusHandler.TaskRunning(this); 
    try { 
    super.run(); 
    finally { 
    statusHandler.TaskCompleted(this); 
    } 
} 

一旦RunnableFuture#run()被调用,这是真的,你在运行的任务,或者至少试图运行。一旦FutureTask#run()完成,您的任务不再运行。恰巧在取消的情况下,转换是(几乎)立即的。

试图避免调用ITaskStatusHandler#taskRunning()如果内部CallableRunnable永远不会被调用FutureTask#run()将要求您建立CallableRunnableFutureTask派生类型本身之间的一些共享的结构,这样,当第一次叫你的内在功能您设置了一些标志,表明外部FutureTask衍生类型可以作为锁存器观察到,表明是的,在取消之前,函数确实开始运行。但是,到那时,你必须承诺致电ITaskStatusHandler#taskRunning(),所以区别不是很有用。

我一直在挣扎了类似的设计问题最近,彼时我重写FutureTask#run()方法操作后定居在对称之前。

+0

我假设在finally子句中的调用应该是statusHandler.TaskCompleted(this);有没有运行方法不会被调用,但done()方法呢? – nos 2009-11-27 21:20:51

+0

感谢您发现错误。我在finally块中修复了这个呼叫。 是的,可以在不调用run()的情况下调用done()。请参阅FutureTask#Sync#innerCancel(布尔值)。在那里,如果任务尚未完成,包括它从未开始运行,您可以看到done()将被调用。请注意,对于任何FutureTask实例,done()都将被调用0或1次:如果run()和cancel()都不会被调用,则返回0。 – seh 2009-11-27 21:52:50

+0

似乎,如果你将它提交给执行者完成()将被调用,如果该任务在执行之前被取消 - 尽管执行者不知道这一点。执行者只知道可运行的/可调用的。因此,run()会在最终变成可运行的时候被调用,在这种情况下,FutureTask#run()基本上什么都不做。 – nos 2009-11-27 22:14:54

2

你的问题是你的未来任务仍然被执行后,你已经调用取消他们,对吧?

将任务提交给执行程序服务后,应该由执行程序管理。 (如果你喜欢,你仍然可以取消一个任务。)你应该用executor shutdownNow method取消执行。 (这将调用所有提交任务的取消方法。)关机仍然会执行所有提交的任务。

执行者不会“知道”任务被取消。它将独立于未来任务的内部状态调用该方法。

最简单的方法是按照原样使用Executor框架并编写Callable装饰器。

class CallableDecorator{ 

    CallableDecorator(Decorated decorated){ 
    ... 
    } 

    setTask(FutureTask task){ 
    statusHandler.taskCreated(task); 
    } 

    void call(){ 
    try{ 
     statusHandler.taskRunning(task); 
     decorated.call(); 
    }finally{ 
     statusHandler.taskCompleted(task); 
    } 
    } 
} 

唯一的问题是,任务不能在装饰器的构造函数中。 (这是未来任务构造函数的一个参数。)要打破这个循环,您必须使用setter或代理工作来处理构造函数注入。也许这根本不需要回调,你可以说:statusHandler.callableStarted(decorated)

根据您的要求,您可能必须发出异常和中断信号。

基本实现:

class CallableDecorator<T> implements Callable<T> { 

    private final Callable<T> decorated; 
    CallableDecorator(Callable<T> decorated){ 
     this.decorated = decorated; 
    } 

    @Override public T call() throws Exception { 
     out.println("before " + currentThread()); 
     try { 
      return decorated.call(); 
     }catch(InterruptedException e){ 
      out.println("interupted " + currentThread()); 
      throw e; 
     } 
     finally { 
      out.println("after " + currentThread()); 
     } 
    } 
} 

ExecutorService executor = newFixedThreadPool(1); 
Future<Long> normal = executor.submit(new CallableDecorator<Long>(
     new Callable<Long>() { 
      @Override 
      public Long call() throws Exception { 
       return System.currentTimeMillis(); 
      } 
     })); 
out.println(normal.get()); 

Future<Long> blocking = executor.submit(new CallableDecorator<Long>(
     new Callable<Long>() { 
      @Override 
      public Long call() throws Exception { 
       sleep(MINUTES.toMillis(2)); // blocking call 
       return null; 
      } 
     })); 

sleep(SECONDS.toMillis(1)); 
blocking.cancel(true); // or executor.shutdownNow(); 

输出:

before Thread[pool-1-thread-1,5,main] 
after Thread[pool-1-thread-1,5,main] 
1259347519104 
before Thread[pool-1-thread-1,5,main] 
interupted Thread[pool-1-thread-1,5,main] 
after Thread[pool-1-thread-1,5,main] 
+2

严重的是,你想通过杀死整个执行者来控制一个可调用的对象吗?如果你有一个执行者每个可调用的话,那将会很好。在这种情况下,为什么甚至使用执行者?通常你想要有选择地取消工作,因此对Future的取消方法。 – james 2009-11-27 19:57:11

+0

否。示例代码显示如何杀死执行程序或单个任务。 – 2009-11-27 22:10:34