7

我对生产者 - 消费者模式的理解是,它可以使用生产者和消费者之间共享的队列来实现。生产者将工作提交给共享队列,消费者检索并处理它。它也可以由生产者直接提交给消费者实施(生产者线程直接提交给消费者的执行者服务)。生产者消费者 - 使用Executors.newFixedThreadPool

现在,我一直在寻找提供线程池的一些常见实现的Executors类。根据规范,newFixedThreadPool方法“重用固定数量的线程,在共享的无界队列中运行”。他们在这里谈论哪个队列?

如果生产者直接向消费​​者提交任务,它是包含Runnables列表的ExecutorService的内部队列吗?

或者它是中间队列,以防生产者提交到共享队列?

可能是我错过了整点,但有人请澄清?

回答

4

你是对的,ExecutorService不仅是一个线程池,它是一个完整的生产者 - 消费者实现。这个内部队列实际上是一个线程安全的队列Runnable(确切地说是FutureTask),它持有你的任务submit()

池中的所有线程在该队列上被阻塞,等待任务被执行。当你有一个任务时,恰好有一个线程会把它拿起来并运行它。当然submit()不等待池中的线程完成处理。

在另一方面,如果你提交的任务(或长期运行的)的数量巨大,你可能最终向上与被占用池中的所有线程和一些任务在队列中等待。一旦任何线程完成其任务,它将立即从队列中选择第一个线程。

+0

只是为了澄清:'ExecutorService'只是一个接口。你可以在一个线程中执行'ExecutorService'类,它只要运行一个可运行的类就可以运行每个可运行的类(我相信'java.util.concurrent'包中有一个实现就是这么做的) 。但在实践中*,大多数Exec​​utorService实现是完整的生产者 - 消费者实现。 –

+0

你是绝对正确的,通过'ExecutorService''我的意思是“*执行'ExecutorService' *”的'Executors.newFixedThreadPool()'返回的东西。感谢您的澄清。 –

+1

谢谢你们。所以,如果我创建使用的newFixedThreadPool(8),然后就可以执行大约1000可运行任务的执行服务,请确认一下我的情况的理解: 1.最多8个线程将在处理开始创建 2 ,而8个线程正忙,992个任务将被保存在内部队列中 3.另外,因为它是一个无界队列,所以我可以提交给执行程序服务的任务数量没有上限。 如果我使用有界队列创建ExecutorService,上述场景会产生什么影响?它会表现更好吗? 谢谢,O. – Oxford

1
public class Producer extends Thread { 
    static List<String> list = new ArrayList<String>(); 

    public static void main(String[] args) { 
     ScheduledExecutorService executor = Executors 
       .newScheduledThreadPool(12); 
     int initialDelay = 5; 
     int pollingFrequency = 5; 
     Producer producer = new Producer(); 
     @SuppressWarnings({ "rawtypes", "unused" }) 
     ScheduledFuture schedFutureProducer = executor.scheduleWithFixedDelay( 
       producer, initialDelay, pollingFrequency, TimeUnit.SECONDS); 
     for (int i = 0; i < 3; i++) { 
      Consumer consumer = new Consumer(); 
      @SuppressWarnings({ "rawtypes", "unused" }) 
      ScheduledFuture schedFutureConsumer = executor 
        .scheduleWithFixedDelay(consumer, initialDelay, 
          pollingFrequency, TimeUnit.SECONDS); 
     } 

    } 

    @Override 
    public void run() { 
     list.add("object added to list is " + System.currentTimeMillis()); 
           ///adding in list become slow also because of synchronized behavior 
    } 
} 

class Consumer extends Thread { 

    @Override 
    public void run() { 
     getObjectFromList(); 
    } 

    private void getObjectFromList() { 
     synchronized (Producer.list) { 
      if (Producer.list.size() > 0) { 
       System.out.println("Object name removed by " 
         + Thread.currentThread().getName() + "is " 
         + Producer.list.get(0)); 
       Producer.list.remove(Producer.list.get(0)); 
      } 
     } 
    } 
}