2017-08-25 125 views
1

在RabbitMQ的NACK消息,我有以下Consumer类监听队列上的传入消息,然后两个ACK和NACK他们。 ack部分工作正常,但nack不起作用。由于某种原因,所有的消息都会得到回应。无法使用SprintBoot

application.properties中

spring.rabbitmq.host=192.168.99.100 
spring.rabbitmq.port=5677 
spring.rabbitmq.username=abc 
spring.rabbitmq.password=def 
spring.rabbitmq.listener.acknowledge-mode=manual 

Producer.java

@Component 
public class Producer implements CommandLineRunner { 

    @Autowired 
    private RabbitTemplate rabbitTemplate; 

    @Autowired 
    private Queue queue; 

    @Override 
    public void run(String... args) throws Exception { 
     for (int i = 0; i < 100; i++) { 
      this.rabbitTemplate.convertAndSend(this.queue.getName(), "Hello World !"); 
     } 
    } 

} 

Consumer.java

@Component 
public class Consumer { 

    private final CountDownLatch latch = new CountDownLatch(1); 
    int ctr = 0; 

    @RabbitListener(queues = "producer-consumer-nack2.q") 
    public void receiveQueue(String text, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException, InterruptedException { 
     ctr++; 
     //nack every 10th, 20th, 30th and so on message 
     if (ctr % 10 == 0) { 
      System.out.println("Nack Message #" + ctr + ": " + text); 
      channel.basicNack(tag, false, true); 
     } else { 
      System.out.println("Ack Message #" + ctr + ": " + text); 
      channel.basicAck(tag, true); 
     } 
     latch.countDown(); 
    } 

    public CountDownLatch getLatch() { 
     return latch; 
    } 

} 

由于所有消息都被消耗,所以队列是空的(见下文)。 enter image description here

回答

2

我相信nack的工作方式是,它会拒绝然后将队列中的同一点处的重新传递的消息排队。 (见RabbitMQ的文档here

因此,当您正在寻找处理它的结束会被拒绝,然后再加工以后。

我建议调试代码,在您的NACK条件断点(或打印),看它是否击中该代码块。然后,如果你在nack之后但在下一个消息处理之前进行调试并检查你的队列 - 我想你会看到一条消息。