2017-08-16 250 views
2

我正在使用spring-rabbitmq,并且我可以成功获取消息。 但是当我调试时,我发现监听器创建一个线程,它会每1秒询问 消息。我认为速率太高,我想要做的是将速率设置为1分钟或任何其他。 我搜索了很多,但没有工作如何设置spring-rabbitmq收听消息的速率

我springrabbit.xml:

<rabbit:listener-container connection-factory="connectionFactory" message-converter="jsonMessageConverter" > 
     <rabbit:listener queues="notification" ref="messageReceiver"/> 
    </rabbit:listener-container> 

我的Java代码:

@Override 
     public void onMessage(Message message) { System.out.println("messagebody: "+new String(message.getBody())); 
      LOGGER.info(dateFormatUtil.getDateFormat(new Date())+new String(message.getBody())); 
      boolean result=false; 
      SendSingleEmailService sendSingleEmailService = new SendSingleEmailService(); 
      try { 
       result =sendSingleEmailService.send(new String(message.getBody())); 
      } catch (FileNotFoundException e) { 
       LOGGER.error(dateFormatUtil.getDateFormat(new Date())+"[NOTIFICATION] [ERROR] message is null!"); 
       e.printStackTrace(); 
      } 
      if(!result) { 
       try{ 
        throw new Exception(); 
       }catch (FileNotFoundException e) { 
        throw new RuntimeException(e); 
       }catch (Exception e) { 
        throw new RuntimeException(e); 
       }finally { 
        LOGGER.error(dateFormatUtil.getDateFormat(new Date())+"[NOTIFICATION] [ERROR] Send Email failed!"); 
       } 
      } 


     } 

是一些调试结果如下:

[2017-08-16 18:23:08,595]DEBUG 4286[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0 
[2017-08-16 18:23:09,600]DEBUG 5291[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0 
[2017-08-16 18:23:10,602]DEBUG 6293[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0 
[2017-08-16 18:23:11,603]DEBUG 7294[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0 
[2017-08-16 18:23:12,609]DEBUG 8300[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0 
[2017-08-16 18:23:13,612]DEBUG 9303[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0 
[2017-08-16 18:23:14,615]DEBUG 10306[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0 
[2017-08-16 18:23:15,617]DEBUG 11308[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0 
[2017-08-16 18:23:16,618]DEBUG 12309[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0 
[2017-08-16 18:23:17,619]DEBUG 13310[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0 

回答

0

您可以增加receiveTimeout - 请参见Message Listener Container Configuration

但是,容器对stop()请求的响应较少。

我认为你过分关注轮询速率 - 轮询用于阶段递送消息的内部队列的开销很小。

如果只是要删除(在调试时)日志“噪音”设置org.springframework.amqp.rabbit.listener.BlockingQueueConsumer日志类别INFOWARN

即将发布的2.0版本有一个新的DirectMessageListenerContainer,它不轮询内部队列并消除此问题。 Info here

编辑

听众仍然要求的RabbitMQ的消息每1秒

如果你仍然看到调试消息每隔1秒,你有没有配置正确receiveTimeout;它不是“询问rabbitmq”的消息,线程在等待receiveTimeout(并且发现兔子没有发送新消息)后醒来,因此它可以对stop()作出反应;然后它再次睡觉,直到有新消息到达或再次超时。如果没有消息可用,则不会与代理进行交互 - 消息由代理推送。

也许你误解了侦听器容器的用途。它用于消息驱动的应用程序 - 不能“减慢”消息到达的速度 - 它们是由代理推动的。

如果您希望每分钟只收到一条消息,则应该使用RabbitTemplate (或receiveAndConvert())方法而不是消息侦听器容器。

+0

谢谢,我阅读文档并尝试[receiveTimeout],然后我发现它只在我收到消息时才会发生,否则听众仍会每隔1秒向rabbitmq发送消息。我想设置监听器每隔1分钟询问一次消息。执行此操作吗?谢谢。 – shawn

+0

请参阅编辑我的答案。 –

+0

omg,我误解了它。谢谢,我再次尝试receiveAndConvert(),它的工作原理。 – shawn

相关问题