2017-06-15 61 views
2

我已经设置了ThreadPoolExecutor并启动线程以使用阻塞队列中的数据。 在启动时(当我调用下面的startThread时),阻塞队列是空的。 我已经设置线程的超时时间非常大,以便它们不会死亡。 阻塞队列在WorkerThreadPoolExecutor的范围之外创建,并且Runnable项目放在它上面。ThreadPoolExecutor不使用数据

public class WorkerThreadPoolExecutor extends ThreadPoolExecutor { 

    private final MyBlockingQueue<MyRunnable> blockingQueue; 
    private ScheduledExecutorService statsExecutor = null; 

    public WorkerThreadPoolExecutor(MyBlockingQueue myBlockingQueue) { 

     super(5, 10, 5, TimeUnit.MINUTES, myBlockingQueue); 
     this.blockingQueue = myBlockingQueue; 

    } 

    @Override 
    public void shutdown() { 
     logger.info("Shutting down the stats emitter!"); 
     super.shutdown(); 
     if (statsExecutor != null) { 
      statsExecutor.shutdown(); 
     } 
    } 

    public void startThreads() { 
     logger.info("Starting the WorkerThreadPoolExecutor!!!"); 
     this.prestartCoreThread(); 
     emitStats(); 
    } 

    public void numThds() { 
     System.err.println("\t\t active: " + this.getActiveCount()); 
     System.err.println("\t\t completed taskCount: " + this.getCompletedTaskCount()); 
     System.err.println("\t\t core: " + this.getCorePoolSize()); 
     System.err.println("\t\t poolsize: " + this.getPoolSize()); 
     System.err.println("\t\t taskCount: " + this.getTaskCount()); 
     System.err.println("\t\t Q-Size: " + this.getQueue().size()); 

     //System.err.println("X Size is: -------------> " + blockingQueue.currentSize()); 
     System.err.println("X Size is: -------------> " + blockingQueue.getBlockingQueue().size()); 
     System.err.println("X Size is: -------------> " + this.getQueue().size()); 
    } 

    public void emitStats() { 

     this.statsExecutor = Executors.newScheduledThreadPool(1); 

     final Runnable emitStats = new Runnable() { 
      public void run() { 
       System.err.println("Stats id: " + blockingQueue.id); 
       //System.err.println("Size is: -------------> " + blockingQueue.currentSize()); 
       System.err.println("Stats size is: -------------> " + blockingQueue.getBlockingQueue().size()); 

       numThds(); 
      } 
     }; 
     statsExecutor.scheduleAtFixedRate(emitStats, 2, 2, TimeUnit.SECONDS); 
    } 

} 

阻挡队列上面的范围和项目穿上它之外创建:

BlockingQueue<MyRunnable> blockingQueue = new LinkedBlockingQueue() 

项目被添加到队列中进行处理,但它们决不出队。 我添加了产生下列结果为统计度量:

Stats size is: -------------> 2 
    active: 0 
    completed taskCount: 0 
    core: 5 
    poolsize: 0 
    taskCount: 2 
    Q-Size: 2 
X Size is: -------------> 2 
X Size is: -------------> 2 

如何可以强制将采取关闭阻塞队列中的项目并执行?

为MyRunnalbe的代码是:

public class MyRunnable implements Runnable { 
    private int x; 
    public MyRunnable(int x) { 
     this.x = x; 
    } 
    public void run() { 
     System.out.println("----> " + x); 
    } 
} 

我通过调用创建它的一个实例:

MyRunnable mr = new MyRunnable(3); 

,并通过调用排队:

blockingQueue.add(mr); 
+0

看来尽管活动线程数为0是可疑的! –

回答

1

两件事情:

  • 在每次调用emitStats()期间绝对不需要创建新的执行器服务。相反,这与这种服务的整个想法相矛盾。
  • 您的示例中没有代码会实际消耗tgat队列中的元素。打印其大小不会改变队列!
+0

1.我添加了emitStats以查看随着时间的推移发生了什么 2.放入阻塞队列的项目是可运行的;不应该将队列中的线程池阻塞,并且线程会从队列中获取并从队列中取出项目,并在它将其出队后执行它? –

+0

线程完成您指示他们执行的任何操作。再说一遍:你没有显示任何可以做到的代码。计算机不能用魔法工作 - 他们确实按照你的代码告诉他们做的事情。 – GhostCat

+0

问题似乎在这里,池大小为0,我似乎无法(重新启动它们)。 –