我正在使用spring-rabbitmq,并且我可以成功获取消息。 但是当我调试时,我发现监听器创建一个线程,它会每1秒询问 消息。我认为速率太高,我想要做的是将速率设置为1分钟或任何其他。 我搜索了很多,但没有工作如何设置spring-rabbitmq收听消息的速率
我springrabbit.xml:
<rabbit:listener-container connection-factory="connectionFactory" message-converter="jsonMessageConverter" >
<rabbit:listener queues="notification" ref="messageReceiver"/>
</rabbit:listener-container>
我的Java代码:
@Override
public void onMessage(Message message) { System.out.println("messagebody: "+new String(message.getBody()));
LOGGER.info(dateFormatUtil.getDateFormat(new Date())+new String(message.getBody()));
boolean result=false;
SendSingleEmailService sendSingleEmailService = new SendSingleEmailService();
try {
result =sendSingleEmailService.send(new String(message.getBody()));
} catch (FileNotFoundException e) {
LOGGER.error(dateFormatUtil.getDateFormat(new Date())+"[NOTIFICATION] [ERROR] message is null!");
e.printStackTrace();
}
if(!result) {
try{
throw new Exception();
}catch (FileNotFoundException e) {
throw new RuntimeException(e);
}catch (Exception e) {
throw new RuntimeException(e);
}finally {
LOGGER.error(dateFormatUtil.getDateFormat(new Date())+"[NOTIFICATION] [ERROR] Send Email failed!");
}
}
}
是一些调试结果如下:
[2017-08-16 18:23:08,595]DEBUG 4286[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0
[2017-08-16 18:23:09,600]DEBUG 5291[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0
[2017-08-16 18:23:10,602]DEBUG 6293[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0
[2017-08-16 18:23:11,603]DEBUG 7294[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0
[2017-08-16 18:23:12,609]DEBUG 8300[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0
[2017-08-16 18:23:13,612]DEBUG 9303[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0
[2017-08-16 18:23:14,615]DEBUG 10306[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0
[2017-08-16 18:23:15,617]DEBUG 11308[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0
[2017-08-16 18:23:16,618]DEBUG 12309[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0
[2017-08-16 18:23:17,619]DEBUG 13310[SimpleAsyncTaskExecutor-1] - org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:186) - Retrieving delivery for Consumer: tag=[amq.ctag-5AR22lnMjmLAj329LDpGbQ], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), acknowledgeMode=AUTO local queue size=0
谢谢,我阅读文档并尝试[receiveTimeout],然后我发现它只在我收到消息时才会发生,否则听众仍会每隔1秒向rabbitmq发送消息。我想设置监听器每隔1分钟询问一次消息。执行此操作吗?谢谢。 –
shawn
请参阅编辑我的答案。 –
omg,我误解了它。谢谢,我再次尝试receiveAndConvert(),它的工作原理。 – shawn