2017-07-29 43 views
0

我正在使用Java ExecutorService(ThreadPool)执行任务&更新UI,而特定活动处于前景(可见)。Java ExecutorService - 任务/可调用不取消/中断

问题: 我要的是,当用户切换到另一个活动,我想停止/取消所有任务(不管是排队或正在运行)。为此,我必须在通过调用isDone()来检查Future对象的状态之后,对ExecutorService提交方法返回的Future对象使用ExecutorService shutdown/shutdownNow方法或取消(true)。这将设置中断的相应线程标志为TRUE,我必须在我的可调用实现中检查(Thread.currentThread.isInterrupted()),以确定是否中断退出任务/线程。问题是我是否调用ExecutorService的shutdown方法或将来的取消这两种情况下(true)方法很少的1 10倍,这将线程中断标志设置为TRUE这是最终导致内存泄漏等

代码:

线程池辛格尔顿实现(cancelAll-取消任务& shutdownExecutor到关机的ExecutorService):

private static class ThreadPoolManager { 

    private ExecutorService executorService; 
    private List<Future> queuedFutures; 
    private BlockingQueue<Runnable> blockingQueue; 

    private static ThreadPoolManager instance; 

    private ThreadPoolManager() { 
     MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-created(constructor)"); 
     queuedFutures = new ArrayList<>(); 
     blockingQueue = new LinkedBlockingDeque<>(); 
     executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1, TimeUnit.SECONDS, blockingQueue); 
    } 

    static { 
     instance = new ThreadPoolManager(); 
    } 

    public static void submitItemTest(Callable<Object> callable) { 
     MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted item test"); 
     if(instance.executorService.isShutdown()){ 
      instance=new ThreadPoolManager(); 
     } 
     Future future = instance.executorService.submit(callable); 
     instance.queuedFutures.add(future); 
    } 

    public static void submitTestAll(Callable<Object> callable) { 
     MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted test all"); 
     if(instance.executorService.isShutdown()){ 
      instance=new ThreadPoolManager(); 
     } 
     cancelAll(); 
     Future future = instance.executorService.submit(callable); 
     instance.queuedFutures.add(future); 
    } 

    public static void cancelAll() { 
     MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelling all future tasks"); 
     instance.blockingQueue.clear(); 
     for (Future future : instance.queuedFutures) { 
      if (!future.isDone()) { 
       boolean cancelled = future.cancel(true); 
       MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelled-" + cancelled); 
      } 
     } 
     instance.queuedFutures.clear(); 
    } 

    public static void shutdownExecutor(){ 
     MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Shuttingdown threadpool"); 
     instance.executorService.shutdownNow(); 
    } 
} 

可赎回执行情况(正常迭代& if语句来检查中断) :

private Callable<Object> getTestAllCallable() { 
     return new Callable<Object>() { 
      @Override 
      public Object call() { 
       for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) { 
        if (!Thread.currentThread().isInterrupted()) { 
          //someWork 

        } else { 
         MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "ThreadInterrupted-Cancelling"); 
         return null; 
        } 
       } 
       return null; 
      } 
     }; 
    } 

活动/片段的onStop实现(调用取消任务&关机):

@Override 
public void onStop() { 
    MyLogger.log(MyLogger.LOG_TYPE.INFO, "onStop called"); 
    ThreadPoolManager.cancelAll(); 
    ThreadPoolManager.shutdownExecutor(); 
    super.onStop(); 
} 

更新:由

变化:

  1. 使用迁可运行而不是可调用的。

  2. 现在不使用单例执行ExecutorService。

    private class ThreadPoolManager { 
    
        private ExecutorService executorService; 
        private List<Future> queuedFutures; 
        private BlockingQueue<Runnable> blockingQueue; 
    
        private ThreadPoolManager() { 
         MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-created(constructor)"); 
         queuedFutures = new ArrayList<>(); 
         blockingQueue = new LinkedBlockingDeque<>(); 
         executorService =getNewExecutorService(); 
        } 
    
        private ExecutorService getNewExecutorService(){ 
         return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1, TimeUnit.SECONDS, blockingQueue); 
        } 
    
        private void submitItemTest(Runnable runnable) { 
         MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted item test"); 
         if(executorService.isShutdown()){ 
          executorService=getNewExecutorService(); 
         } 
         Future future = executorService.submit(runnable); 
         queuedFutures.add(future); 
        } 
    
        private void submitTestAll(Runnable runnable) { 
         MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted test all"); 
         if(executorService.isShutdown()){ 
          executorService=getNewExecutorService(); 
         } 
         cancelAll(); 
         Future future = executorService.submit(runnable); 
         queuedFutures.add(future); 
        } 
    
        private void cancelAll() { 
         MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelling all future tasks"); 
         blockingQueue.clear(); 
         for (Future future : queuedFutures) { 
          if (!future.isDone()) { 
           boolean cancelled = future.cancel(true); 
           MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelled-" + cancelled); 
          } 
         } 
         queuedFutures.clear(); 
        } 
    
        private void shutdownExecutor(){ 
         MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Shuttingdown threadpool"); 
         executorService.shutdownNow(); 
         blockingQueue.clear(); 
         queuedFutures.clear(); 
        } 
    } 
    

发现了罪魁祸首,但不是解决办法呢。以下是正在运行的Runnables 1(isInterrupted返回true或InterupptedException和任务结束)的实现,但不包含其他。

工作Runnable接口(我用它测试):

new Runnable() { 
      @Override 
      public void run() { 
        int i=0; 
        while(!Thread.currentThread().isInterrupted()){ 
         try { 
          System.out.println(i); 
          Thread.currentThread().sleep(2000); 
         } catch (InterruptedException e) { 
          MyLogger.log(MyLogger.LOG_TYPE.DEBUG,"Interrupted"); 
          return; 
         } 
         i++; 
        } 
       } 
      } 

不工作(实际的代码我想使用):

new Runnable(){ 
      @Override 
      public void run() { 
       for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) { 
        if (!Thread.currentThread().isInterrupted()) { 

        } else { 
         MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Thread Interrupted (Cancelled)"); 
         break; 
        } 
       } 
      } 
     }; 

和1个可能的解决办法是使用变量(布尔)作为runnable中的中断标志,我认为这是最后的手段,但很乐意了解这个错误。

回答

0

解决方案(出路): 所以最后我又继续使用自定义的内部标志(布尔)作为一个线程中断标志将在每次迭代时由MyRunnable进行检查(自定义标志的自定义实现,以便每个可运行标志都有一个标志)。当需要在ExecutorService(ThreadPool)下取消线程时,我遍历所有Future对象,并获取它与MyRunnable相关联,并将其中断标志(自定义标志)设置为true,而不是中断/关闭线程。

ThreadPoolManager:

private class ThreadPoolManager { 

     private ExecutorService executorService; 
     private final Map<Future,MyRunnable> queuedFutures; 
     private final BlockingQueue<Runnable> blockingQueue; 

     private ThreadPoolManager() { 
      MyLogger.log(DEBUG, "Threadpool-created(constructor)"); 
      queuedFutures = new HashMap<>(); 
      blockingQueue = new LinkedBlockingDeque<>(); 
      executorService = getNewExecutorService(); 
     } 

     private ExecutorService getNewExecutorService() { 
      return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1, TimeUnit.SECONDS, blockingQueue); 
     } 

     private void submitItemTest(MyRunnable runnable) { 
      MyLogger.log(DEBUG, "Threadpool-submitted item test"); 
      if (executorService.isShutdown()) { 
       executorService = getNewExecutorService(); 
      } 
      Future future = executorService.submit(runnable); 
      queuedFutures.put(future,runnable); 
     } 

     private void submitTestAll(MyRunnable runnable) { 
      MyLogger.log(DEBUG, "Threadpool-submitted test all"); 
      if (executorService.isShutdown()) { 
       executorService = getNewExecutorService(); 
      } 
      cancelAll(); 
      Future future = executorService.submit(runnable); 
      queuedFutures.put(future,runnable); 
     } 

     private void cancelAll() { 
      MyLogger.log(DEBUG, "ThreadPool: Cancelling all future tasks"); 
      blockingQueue.clear(); 
      for (Future future : queuedFutures.keySet()) { 
       if (!future.isDone()) { 
        queuedFutures.get(future).continueRunning=false; 
        MyLogger.log(DEBUG, "Cancelled"); 
       } 
      } 
      queuedFutures.clear(); 
     } 

     private void shutdownExecutor() { 
      cancelAll(); 
      MyLogger.log(DEBUG, "ThreadPool: Shuttingdown threadpool"); 
      executorService.shutdown(); 
     } 
    } 

MyRunnable(实现可运行抽象类):

private abstract class MyRunnable implements Runnable { 
     boolean continueRunning=true; 
    } 

MyRunnable(抽象类MyRunnable的实例):

new MyRunnable() { 
     @Override 
     public void run() { 
      for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) { 
       if (continueRunning) { 
         //someWork 
       } else { 
        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "ThreadPool: Pool Thread Interrupted (closing down)"); 
        break; 
       } 
      } 
      System.out.println("ThreadPool: Test complete"); 
     } 
    }; 

现在,调用threadPoolManager.shutdownExecutor()关闭/中断当前正在运行的所有线程。

0

根据ExecutorService文档,关闭正在执行的任务是尽最大努力完成的。

因此,当你调用ExecutorService.shutdownNow()实施将尝试关闭所有当前正在执行的任务。每个任务将保持运行,直到它检测到它被中断为止。

为了保证您的线程达到这一点在早期阶段,它是一个好主意,在你的循环添加一个检查线程是否被interuppted如下:

Thread.currentThread().isInterrupted(); 

通过使每此调用迭代你的线程会从实际的中断中间隔很短的时间内检测到中断。

所以修改后的Callable代码如下所示:

private Callable<Object> getTestAllCallable() { 
    return new Callable<Object>() { 
     @Override 
     public Object call() { 
      for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) { 
       if(Thread.currentThread().isInterrupted()) { 
        return null; 
       } 
       if(someCondition) { 
        //someWork 
       } else { 
        MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "ThreadInterrupted-Cancelling"); 
        return null; 
       } 
      } 
      return null; 
     } 
    }; 
} 

顺便说一句,有在使用Callable,如果你不打算从call()方法返回的任何值没有意义。如果你需要在你的任务参数化类型只需要创建一个参数Runnable如下:

public class ParameterizedRunnable<T> implements Runnable { 
    private final T t; 

    public ParameterizedRunnable(T t) { 
     this.t = t; 
    } 

    public void run() { 
     //do some work here 
    } 
} 
+0

对不起,我现在编辑我的代码,包括你已经陈述了实际上我已经这样做,如问题描述中所述,但与其他代码不相关的清除。 –

+0

这并不能解决你的问题? – zuckermanori

+0

我正在使用callable而不是runnable与想法获取Future对象,同时将可调用对象提交给ExecutorService,现在知道它也可以使用runnable来完成。感谢您告诉我,我将更改代码以使用runnable。 –