我正在使用Java ExecutorService(ThreadPool)执行任务&更新UI,而特定活动处于前景(可见)。Java ExecutorService - 任务/可调用不取消/中断
问题: 我要的是,当用户切换到另一个活动,我想停止/取消所有任务(不管是排队或正在运行)。为此,我必须在通过调用isDone()来检查Future对象的状态之后,对ExecutorService提交方法返回的Future对象使用ExecutorService shutdown/shutdownNow方法或取消(true)。这将设置中断的相应线程标志为TRUE,我必须在我的可调用实现中检查(Thread.currentThread.isInterrupted()),以确定是否中断退出任务/线程。问题是我是否调用ExecutorService的shutdown方法或将来的取消这两种情况下(true)方法很少的1 10倍,这将线程中断标志设置为TRUE这是最终导致内存泄漏等
代码:
线程池辛格尔顿实现(cancelAll-取消任务& shutdownExecutor到关机的ExecutorService):
private static class ThreadPoolManager {
private ExecutorService executorService;
private List<Future> queuedFutures;
private BlockingQueue<Runnable> blockingQueue;
private static ThreadPoolManager instance;
private ThreadPoolManager() {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-created(constructor)");
queuedFutures = new ArrayList<>();
blockingQueue = new LinkedBlockingDeque<>();
executorService = new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1, TimeUnit.SECONDS, blockingQueue);
}
static {
instance = new ThreadPoolManager();
}
public static void submitItemTest(Callable<Object> callable) {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted item test");
if(instance.executorService.isShutdown()){
instance=new ThreadPoolManager();
}
Future future = instance.executorService.submit(callable);
instance.queuedFutures.add(future);
}
public static void submitTestAll(Callable<Object> callable) {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted test all");
if(instance.executorService.isShutdown()){
instance=new ThreadPoolManager();
}
cancelAll();
Future future = instance.executorService.submit(callable);
instance.queuedFutures.add(future);
}
public static void cancelAll() {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelling all future tasks");
instance.blockingQueue.clear();
for (Future future : instance.queuedFutures) {
if (!future.isDone()) {
boolean cancelled = future.cancel(true);
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelled-" + cancelled);
}
}
instance.queuedFutures.clear();
}
public static void shutdownExecutor(){
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Shuttingdown threadpool");
instance.executorService.shutdownNow();
}
}
可赎回执行情况(正常迭代& if语句来检查中断) :
private Callable<Object> getTestAllCallable() {
return new Callable<Object>() {
@Override
public Object call() {
for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) {
if (!Thread.currentThread().isInterrupted()) {
//someWork
} else {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "ThreadInterrupted-Cancelling");
return null;
}
}
return null;
}
};
}
活动/片段的onStop实现(调用取消任务&关机):
@Override
public void onStop() {
MyLogger.log(MyLogger.LOG_TYPE.INFO, "onStop called");
ThreadPoolManager.cancelAll();
ThreadPoolManager.shutdownExecutor();
super.onStop();
}
更新:由
变化:
使用迁可运行而不是可调用的。
现在不使用单例执行ExecutorService。
private class ThreadPoolManager { private ExecutorService executorService; private List<Future> queuedFutures; private BlockingQueue<Runnable> blockingQueue; private ThreadPoolManager() { MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-created(constructor)"); queuedFutures = new ArrayList<>(); blockingQueue = new LinkedBlockingDeque<>(); executorService =getNewExecutorService(); } private ExecutorService getNewExecutorService(){ return new ThreadPoolExecutor(Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), 1, TimeUnit.SECONDS, blockingQueue); } private void submitItemTest(Runnable runnable) { MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted item test"); if(executorService.isShutdown()){ executorService=getNewExecutorService(); } Future future = executorService.submit(runnable); queuedFutures.add(future); } private void submitTestAll(Runnable runnable) { MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Threadpool-submitted test all"); if(executorService.isShutdown()){ executorService=getNewExecutorService(); } cancelAll(); Future future = executorService.submit(runnable); queuedFutures.add(future); } private void cancelAll() { MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelling all future tasks"); blockingQueue.clear(); for (Future future : queuedFutures) { if (!future.isDone()) { boolean cancelled = future.cancel(true); MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Cancelled-" + cancelled); } } queuedFutures.clear(); } private void shutdownExecutor(){ MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Shuttingdown threadpool"); executorService.shutdownNow(); blockingQueue.clear(); queuedFutures.clear(); } }
发现了罪魁祸首,但不是解决办法呢。以下是正在运行的Runnables 1(isInterrupted返回true或InterupptedException和任务结束)的实现,但不包含其他。
工作Runnable接口(我用它测试):
new Runnable() {
@Override
public void run() {
int i=0;
while(!Thread.currentThread().isInterrupted()){
try {
System.out.println(i);
Thread.currentThread().sleep(2000);
} catch (InterruptedException e) {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG,"Interrupted");
return;
}
i++;
}
}
}
不工作(实际的代码我想使用):
new Runnable(){
@Override
public void run() {
for (int i = 0; i < inbuiltProxyPojoArrayList.size(); i++) {
if (!Thread.currentThread().isInterrupted()) {
} else {
MyLogger.log(MyLogger.LOG_TYPE.DEBUG, "Thread Interrupted (Cancelled)");
break;
}
}
}
};
和1个可能的解决办法是使用变量(布尔)作为runnable中的中断标志,我认为这是最后的手段,但很乐意了解这个错误。
对不起,我现在编辑我的代码,包括你已经陈述了实际上我已经这样做,如问题描述中所述,但与其他代码不相关的清除。 –
这并不能解决你的问题? – zuckermanori
我正在使用callable而不是runnable与想法获取Future对象,同时将可调用对象提交给ExecutorService,现在知道它也可以使用runnable来完成。感谢您告诉我,我将更改代码以使用runnable。 –