2016-12-16 142 views
2

我想要实现使用兔子的操作,即拒绝/延迟循环:延迟的消息循环与RabbitMQ的

我有:

  1. 主队列与主要交易所绑定到它和DLX为待机交换。
  2. 为StandBy排队待机Exchange绑定到其与60年代TTL和DLX主交换

基本上我想:

  1. 从主队列消耗
  2. 拒绝消息(在某​​些circunstances)
  3. 将被重定向到StandBy队列,因为拒绝
  4. 当TTL到期时,将消息重新排队到主队列。

步骤1,2和3都正常,但最后一个放弃消息而不是重新排队它。

有些理论是从的RabbitMQ的文档我来设计,这是:从队列

消息可以是“死字母”;即,当以下任一事件的发生重新发布到另一个交换:

  1. 该消息被拒绝,并重新排队=假(basic.reject或basic.nack),
  2. 该消息的TTL届满;或
  3. 超过队列长度限制。

...

有可能形成消息的死文字的周期。例如,当一个队列在没有指定死信路由密钥的情况下将消息发送到默认交换机时,会发生这种情况。如果在整个周期内没有拒绝,这些周期中的消息(即达到相同队列两次的消息)将被丢弃。

理论认为应该重新排队,因为它有一个拒绝从第2步的周期,因此,你能帮助我弄清楚为什么它下降的消息,而不是重新排队它?

UPDATE:

我针对的版本是2.8.4,似乎在那一刻if there was no rejections in the entire cycle是不是在使用的情况下,反正你可以检查此您ownselves RabbitMQ 2.8.x Docs

我将接受@george答案,因为通过此代码可以实现原始目标。

回答

1

Rafael,我不确定你在用什么客户端,但是用Python中的Pika客户端,你可以实现类似这样的事情。为了简单起见,我只使用一次交换。你确定你正在设置交换机和路由键吗?

sender.py

import sys 
import pika 
connection = pika.BlockingConnection(pika.ConnectionParameters(
       'localhost')) 
channel = connection.channel() 
channel.exchange_declare(exchange='cycle', type='direct') 
channel.queue_declare(queue='standby_queue', 
         arguments={ 
          'x-message-ttl': 10000, 
          'x-dead-letter-exchange': 'cycle', 
          'x-dead-letter-routing-key': 'main_queue'}) 
channel.queue_declare(queue='main_queue', 
         arguments={ 
          'x-dead-letter-exchange': 'cycle', 
          'x-dead-letter-routing-key': 'standby_queue'}) 
channel.queue_bind(queue='main_queue', exchange='cycle') 
channel.queue_bind(queue='standby_queue', exchange='cycle') 
channel.basic_publish(exchange='cycle', 
         routing_key='main_queue', 
         body="message body") 
connection.close() 

receiver.py

import sys 
import pika 
def callback(ch, method, properties, body): 
    print "Processing message: {}".format(body) 
    # replace with condition for rejection 
    if True: 
     print "Rejecting message" 
     ch.basic_nack(method.delivery_tag, False, False) 
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) 
channel = connection.channel() 
channel.basic_consume(callback, queue='main_queue') 
channel.start_consuming() 
+0

@george您好,感谢您的回复,我使用Python和鼠兔藏汉,我有一个非常类似的代码,但我实际上运行你的代码的结果相同,是什么让我觉得,你使用的是哪个版本的RabbitMQ ?,我试过2.8.4和3.6.6版的代码 –

+0

我会接受你的问题,但我已经尝试过这两个服务器和最新版本的作品。然后,我进一步研究,旧文档没有提到“拒绝”异常,所以我想在这种情况下,它会放弃任何尝试刷新的消息 –