2012-07-19 70 views
2

我有下面的代码片段,它运行良好。但问题是它创建并将执行器队列上的2000多个任务放在了一边。如何不压垮java executorservice任务队列?

我需要检查执行程序队列中的任务是否已完成,然后才能给它更多的任务。它不一定是确切的,即如果队列有剩余10个任务,则再添加50个。

所以执行程序任务队列没有这么多挂起的任务,这也将允许shutdown()及时工作,否则即使被调用,执行程序仍会尝试先完成队列中的所有2000个任务。

完成此操作的最佳方法是什么?谢谢

executor = Executors.newFixedThreadPool(numThreads); 

while(some_condition==true) 
{ 
    //if(executor < 10 tasks pending) <---- how do i do this? 
    //{        
     for(int k=0;k<20;k++) 
     { 
      Runnable worker = new MyRunnable(); 
      executor.execute(worker); 
     } 
    //} 
    //else 
    //{ 
    //  wait(3000); 
    //} 
} 

更新使用旗语:

private final Semaphore semaphore = new Semaphore(10) 
executor = new ThreadPoolExecutorWithSemaphoreFromJohnExample(); 

while(some_condition==true) 
{ 

     Runnable worker = new MyRunnable(); 
     //So at this point if semaphore is full, then while loop would PAUSE(??) until 
     //semaphore frees up again. 
      executor.execute(worker); 
} 

回答

5

你可以使用一个简单的信号量。提交后获得新的许可证,并在完成后发布许可证,以允许任何其他人等待提交。

private final Semaphore semaphore = new Semaphore(10);//or however you want max queued at any given moment 
ThreadPoolExecutor tp= new ThreadPoolExecutor(...){ 
     public void execute(Runnable r){ 
      semaphore.acquire(); 
      super.execute(r); 
     }  
     public void afterExecute(Runnable r, Thread t){ 
     semaphore.release(); 
     super.afterExecute(r,t); 
     } 
}; 

所以这里提交的线程将被暂停,如果没有更多的许可证可用。

+0

好的约翰。它会被推荐用来捕获和处理'RejectedExecutionException',或者小心你的信号值小于阻塞队列的限制(如果有的话)。 – Gray 2012-07-19 21:14:35

+0

另外信号量(10)不是最大排队的数量,它是正在运行的+排队的数量。 – Gray 2012-07-19 21:15:41

+0

@格雷你说的都对,好点。我以此为起点,但如果OP要实施这些建议,则应考虑这些建议。 – 2012-07-19 21:22:19

8

我有下面的代码片段,它运行正常。但问题是它创建并将执行器队列上的2000多个任务放在了一边。要做到这一点

一种方法是创建自己的ThreadPoolExecutor用有限的作业队列,并在其上设置自定义RejectedExecutionHandler。这使您可以精确控制要排队的作业数量。

您需要自定义处理程序,因为默认情况下,如果队列已满,ThreadPoolExecutor.submit(...)将抛出RejectedExecutionException。通过下面的自定义处理程序,当它被队列拒绝时,拒绝处理程序会将其重新插入,直到队列有空间为止。所以没有工作会被拒绝/丢弃。

下面介绍如何启动自己的线程池并设置自己的拒绝处理程序。

// you can tune the blocking queue size which is the number of jobs to queue 
// when the NUM_THREADS are all working 
final BlockingQueue<MyRunnable> queue = 
    new ArrayBlockingQueue<MyRunnable>(NUM_JOBS_TO_QUEUE); 
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(NUM_THREADS, NUM_THREADS, 
     0L, TimeUnit.MILLISECONDS, queue); 
// by default (unfortunately) the ThreadPoolExecutor will throw an exception 
// when you submit the job that fills the queue, to have it block you do: 
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() { 
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 
     // this will block if the queue is full as opposed to throwing 
     executor.getQueue().put(r); 
    } 
}); 
... 
// now submit all of your jobs and it will block if the queue is full 
for(int k = 0; k < 20000000; k++) { 
    Runnable worker = new MyRunnable(); 
    threadPool.execute(worker); 
} 

约阻塞线程池见我的答案在这里了解更多详情:

How can I make ThreadPoolExecutor command wait if there's too much data it needs to work on?

您还可以使用ThreadPoolExecutor.CallerRunsPolicy这将导致被提交作业到调用线程池来执行作业。然而,我不喜欢这个解决方案,因为它阻塞了调用者,直到作业结束,这可能会使其他工作线程饿死。另外,如果有多个提交者,它可能仍然会导致太多的线程运行这些作业。

最后,请注意我将ThreadPoolExecutor中的核心和最大线程数设置为相同的数字。不幸的是,默认情况下,执行程序启动核心线程,然后填充队列,然后才分配其他线程直到最大值。这完全违反直觉。

+0

但问题是我不想拒绝任务或抛出任何问题,我只是想等到任务队列有较少的待处理然后添加这些任务。如果创建一个计数器(int tasksRunning = 0),那么每次执行execute()时调用tasksRunning ++,并在runnable.run()完成时执行任务运行 - 谢谢 – user1539050 2012-07-19 20:55:07

+0

我的解决方案不会拒绝任务。这就是整个问题。我会更简单@ user1539050。 – Gray 2012-07-19 20:55:53

+0

也见http://stackoverflow.com/questions/2001086/how-to-make-threadpoolexecutors-submit-method-block-if-it-is-saturated – wrschneider 2012-07-19 21:07:51

3

我通常通过使用任务对象的对象“池队列”来阻止这样的系统 - 在启动时充满X任务的BlockingQueue。任何想要向线程提交任务的任务都必须从池队列中获取一个任务,然后将其加载到数据中,然后提交。

当任务完成并导致它处理完毕后,它将被推回到池队列中以供重新使用。

如果池清空,则在池队列上提交线程块,直到返回一些任务。

这实际上是@John Vint所提出的信号量控制的一种形式,但具有一些其他优点 - 例如,不会连续创建/ GC的可运行子系统。我喜欢将PooolQueue.size转储到定时器上的GUI状态栏,以便我可以看到系统如何“繁忙”(并且还能够快速检测到任何对象泄漏:)

2

您会更好地设置拒绝策略,因为你不想要压倒线程池,我发现最好的办法,而不复杂自己大有作为这是做这样的事情:

final ThreadPoolExecutor executor=(ThreadPoolExecutor)Executors.newFixedThreadPool(THREADS_COUNT); 
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); 

会发生什么事是,一旦所有线程忙着,调用者的线程将执行任务。这里是对这种政策的参考CallerRunsPolicy JavaDoc

+0

不会降低这一点,但只是要注意,铸造它是危险的,因为它不一定是一个ThreadPoolExecutor – KyleM 2013-12-18 20:17:07

+0

ExecutorService这是返回实例的类型没有拒绝执行策略的setter,如果你不放弃它,你将无法做到这一点。在这种情况下,通过不排队更多作业来避免执行。 – 2013-12-29 12:54:03

+0

执行程序通常比较棘手,需要来自Oracle/Sun的一些工作,例如在线程正在执行时没有拒绝策略阻塞队列作业,我不得不做一些不建议在我们公司支持这种功能的东西,直到框架编写者变得更好。 – 2013-12-29 12:56:08