2012-04-27 88 views
10

我正在从队列服务器获取数据,我需要处理它并发送确认。事情是这样的:如何让ThreadPoolExecutor命令等待数据太多需要处理?

while (true) { 
    queueserver.get.data 
    ThreadPoolExecutor //send data to thread 
    queueserver.acknowledgement 

我不完全理解线程会发生什么,但我认为这个程序获取数据,将它的线程,然后立即确认。所以即使我有每个队列的限制只能有200个未确认的项目,它只会尽可能快地收到它。当我在单个服务器上编写程序时,这很好,但是如果我使用多个工作人员,那么这会成为一个问题,因为线程队列中的项目数量不是它完成的工作的反映,而是它的速度可以从队列服务器获取项目。

有什么我可以做以某种方式使程序等待,如果线程队列已满的工作?

回答

20

如何让ThreadPoolExecutor命令等待数据是否需要处理太多?

我不是100%确定我在这里理解你的问题。当然,不是一个开放式的队列,你可以用它限制使用BlockingQueue

BlockingQueue<Date> queue = new ArrayBlockingQueue<Date>(200); 

在提交给ExecutorService就业方面,而不是使用默认的ExecutorService小号创建使用Executors,其使用无界队列中,您可以创建自己的队列:

return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, 
       new ArrayBlockingQueue<Runnable>(200)); 

一旦队列填满,它将导致它拒绝任何提交的新任务。您需要设置提交给队列的RejectedExecutionHandler。例如:

final BlockingQueue queue = new ArrayBlockingQueue<Runnable>(200); 
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(nThreads, nThreads, 
      0L, TimeUnit.MILLISECONDS, queue); 
// by default (unfortunately) the ThreadPoolExecutor will throw an exception 
// when you submit the 201st job, to have it block you do: 
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() { 
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 
     // this will block if the queue is full 
     executor.getQueue().put(r); 
    } 
}); 

我认为这是一个重大的遗漏,Java没有ThreadPoolExecutor.CallerBlocksPolicy

+0

谢谢格雷。为了澄清这一点,如果我将200个物品放入队列中,它将不会再允许,直到队列中的工作关闭为止。如果是这种情况,它只是等待我用来提交给线程的命令,直到有空间发送到阻塞队列为止? – 2012-04-27 15:32:43

+0

如果在'BlockingQueue' @learningJava上使用'put()',那么是的,它会阻塞队列是否已满,然后在队列大小减少时继续。 – Gray 2012-04-27 15:50:17

+0

+1:另一种选择是让RejectedExecutionHandler运行该任务。即'r.run();' – 2012-04-27 16:37:09

2

如果您希望在工作人员开始处理任务时确认,则可以在执行实际工作之前自定义ThreadFactory从线程发送确认。或者您可以覆盖ThreadPoolExecutorbeforeExecute

如果您想在一个新的工作解放出来承担新任务的确认,我想你可以初始化一个SynchronousQueueThreadPoolExecutor.CallerRunsPolicy一个ThreadPoolExecutor,或者用自己的策略,即主叫块。

0

首先,我认为你的态度是错误的,因为你在你的伪代码确实是忙等待,您应该通过从Java并发教程toturial http://docs.oracle.com/javase/tutorial/essential/concurrency/

读忽视了,病提供你与繁忙的解决方案等待(不建议报告):

 ExecutorService e1 = Executors.newFixedThreadPool(20); 
    while (true) { 
      if (!serverq.isEmpty() && !myq.isFull()) myq.enq(serverq.poll()); 
      if (!myq.isEmpty()) e1.execute(myq.poll()); 
    } 

注:

1.确保你的MYQ是同步的,在其他的答案说。你可以扩展一些阻塞队列以确保同步是正确的。

2。你实现了一个可运行的类,它在服务的迭代 中执行你从服务器执行的操作,那些可运行的程序必须将myq作为参数传递给构造函数并将其保存为全局变量。

3.myq获取runnables,在运行方法结束时,您必须确保runnable从myq删除本身

+0

OP的'while(true)'没有在等待。你的while(true)** **试图忙于等待,并没有解决问题,即执行者的默认队列是无界的,所以它将继续接受新的作业,而不管完成了多少。 – trutheality 2012-04-27 17:15:39

+0

myp应该是有界的人,你没有注意代码和笔记中写的东西 – 2012-04-27 18:17:30

0

如何使一个blockingPool不会执行超过200个任务,并在提交201任务之前等待任务完成。我已经在我的应用程序中使用信号量达到了它。您也可以通过将值传递给其构造函数来更改限制。

与@Gray答案的区别在于,在这种情况下很少有任何任务会被拒绝。信号量将使任何201任务等待,除非其他任务结束。尽管如此,如果有任何拒绝,我们有拒绝处理人员将该任务重新提交给执行人。

private class BlockingPool extends ThreadPoolExecutor { 
    private final Semaphore semaphore;  
    public BlockingPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, int tasksAllowedInThreads){  
     super(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,new RejectedExecutionHandler() { 
      @Override 
      public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { 
       executor.execute(r); 
      } 
     }); 
     semaphore = new Semaphore(tasksAllowedInThreads); 
    } 

    @Override 
    public void execute(Runnable task){ 
     boolean acquired = false; 
     do{ 
      try{ 
       semaphore.acquire(); 
       acquired = true; 
      } catch (final InterruptedException e){ 
       // log 
      } 
     } while (!acquired); // run in loop to handle InterruptedException 
     try{ 
      super.execute(task); 
     } catch (final RejectedExecutionException e){ 
      System.out.println("Task Rejected"); 
      semaphore.release(); 
      throw e; 
     } 
    }  

    @Override 
    protected void afterExecute(Runnable r, Throwable t){ 
     super.afterExecute(r, t); 
     if (t != null){ 
      t.printStackTrace(); 
     } 
     semaphore.release(); 
    } 
} 

这是否合理!