2016-11-28 130 views
0

我对Spring的消息处理相当陌生,所以请耐心等待。同时处理RabbitMQ消息的Spring

我想我的RabbitMQ消息处理程序在多个线程同时处理的消息。

@Component 
public class ConsumerService { 

    @RabbitListener(queues = {"q"}) 
    public void messageHandler(@Payload M msg) { 
     System.out.println(msg);   
    } 

} 

... 

@Configuration 
@Import({MessageConverterConfiguration.class, ConsumerService.class}) 
public class ConsumerConfiguration { 

    @Autowired 
    private ConnectionFactory connectionFactory; 

    @Bean 
    public List<Declarable> declarations() { 
     return Arrays.asList(
      new DirectExchange("e", true, false), 
      new Queue("q", true, false, false), 
      new Binding("q", Binding.DestinationType.QUEUE, "e", "q", null) 
     ); 
    } 

    @Bean 
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(MessageConverter contentTypeConverter, SimpleRabbitListenerContainerFactoryConfigurer configurer) { 
     SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); 
     factory.setConcurrentConsumers(10); 
     configurer.configure(factory, connectionFactory); 
     factory.setMessageConverter(contentTypeConverter); 
     return factory; 
    } 
} 

在我的小测试中,队列“q”上有4条消息。我得到处理他们。没事儿。但我会逐一处理它们。如果我在“ConsumerService.messageHandler”中设置了一个断点(实质上延迟了处理消息的完成),我想最终在该断点处有4个线程。但我从来没有超过一个线程。只要我让它运行完成消息的处理,就可以处理下一条消息。我需要做什么才能同时处理这些消息?

回答

0

对不起,我忘了写,我得到它的工作。基本上我现在拥有的是:

... 
factory.setConcurrentConsumers(10); 
factory.setMaxConcurrentConsumers(20); 
factory.setConsecutiveActiveTrigger(1); 
factory.setConsecutiveIdleTrigger(1); 
factory.setPrefetchCount(100); 
... 

我单独concurrentConsumers相信它实际上将最终(下足够的负载)并行处理的消息。问题在于我在我的小测试中只有4条消息,并且它永远不会为打开多个消费者(-thread)而烦恼。将consecutiveActiveTrigger设置为1有助于此处。猜猜prefetchCount也有话要说。无论如何,案件关闭。

1

有实现这一

  • 或者使用线程池在你的消费者处理messae处理的方法有两种。
  • 或者,创建多个消费者。

我看见你正在使用concurrentConsumers属性来自动处理由Spring AMQP创建多个消费者。尝试将PrefetchCount设置为1,并设置MaxConcurrentConsumers。

最有可能你已经在队列中的四个消息和预取计数的默认值是唯一一个消费者消耗大量的所有消息出现在队列中。