我在我的消费者内部抛出一个AmqpException。 我的期望是消息将按照先进先出的顺序返回到队列,并且将来会在某个时间进行重新处理。rabbitmq与春天amqp - 消息卡住的情况下AmqpException
Spring AMQP似乎并未将消息释放回队列。而是试图一遍又一遍地重新处理失败的消息。 这会阻止正在处理的新到消息。卡住的那些永久出现在AMQP控制台内的“未打包”状态。
有什么想法?
我在我的消费者内部抛出一个AmqpException。 我的期望是消息将按照先进先出的顺序返回到队列,并且将来会在某个时间进行重新处理。rabbitmq与春天amqp - 消息卡住的情况下AmqpException
Spring AMQP似乎并未将消息释放回队列。而是试图一遍又一遍地重新处理失败的消息。 这会阻止正在处理的新到消息。卡住的那些永久出现在AMQP控制台内的“未打包”状态。
有什么想法?
这就是rabbitmq/Spring AMQP的工作方式;如果消息被拒绝(引发任何异常),则默认情况下会重新发送消息,并将其放回到队列的头部,以便立即重试。
...在未来某个时间进行重新处理。
您必须适当地配置事情才能实现此目的。
首先,您必须告诉经纪人不要重新发送消息。这是通过将侦听器容器上的defaultRequeueRejected
设置为false(默认情况下为true)完成的。或者,您可以抛出指示容器拒绝(而不是重新发送)单个消息的AmqpRejectAndDontRequeueException
。
但这并不是结束;只是这样做只会导致被拒绝的信息被丢弃。
为避免这种情况,您必须为队列设置Dead Letter Exchange/Queue - 拒绝的消息然后发送到DLX/DLQ而不是被丢弃。通常建议使用策略而不是队列参数。
最后,您可以在DLQ上设置一条消息生存时间,以便在此之后消息从队列中移除。如果您在上设置了另一个适当的无效信函交换队列(DLQ),则可以使该消息在时间到期后重新排队回原始队列。
请注意,这只适用于从原始队列中拒收的货物;在该队列中的消息过期时它将不起作用。
请参阅this answer及其问题的一些链接了解更多详情。
您可以使用x-death
标头的内容来决定是否应该在尝试一些次数后完全放弃(捕获异常并以某种方式处置坏消息;不要抛出异常并且容器将响应信息)。
这是我用来解决这个问题的解决方案。我设置了一个拦截器,在应用退避策略的同时重试消息x次。 http://trippstech.blogspot.com/2016/03/rabbitmq-deadletter-queue-with.html
Gary,谢谢您的反馈。 – Vladimir 2015-02-06 04:26:29
我希望能够使用Spring AMQP RetryOperationsInterceptor,它会在抛出AmqpException时启动。一旦达到最大重试次数,“Recoverer”将抛出一个AmqpRejectAndDontRequeueException,这将抛弃该消息。我想到了重试拦截器的重试尝试(可能需要几个小时的指数退避),消息会返回队列,从而允许处理新到的消息。 – Vladimir 2015-02-06 04:32:52
否;重试拦截器在将其传递给侦听器之前仅阻塞该线程;它只适用于重试之间的短暂延迟;你需要像我描述的那样使用经纪人来推迟重新投资。消费者没有其他办法可以告诉经纪人在重新投递之前等待一段时间;它只是不是amqp协议的一部分。即使使用JMS,这种延迟也是代理商专有的,不属于jms API。 – 2015-02-06 04:41:33