2014-11-06 89 views
0

使用txamqp客户端如下具体情况:如何重启消费和消费被拒绝的消息

  1. 声明所谓的 '消息' (类型=话题)
yield amqp.chan.exchange_declare(exchange='messaging', type='topic') 
交换
  1. 设置消费者
yield amqp.named_queue_declare(queue="submit.sm_all") 
yield amqp.chan.queue_bind(queue="submit.sm_all", exchange="messaging", routing_key="submit.sm.*") 
yield amqp.chan.basic_consume(queue="submit.sm_all", no_ack=False, consumer_tag='qtag') 
  • 发布50消息
  • for i in range(50): 
         yield amqp.publish(exchange='messaging', routing_key="submit.sm.connector01", 
          content=Content(str(i))) 
    
  • 启动消费者用回调重新消耗所有消费的消息
  • queue = yield amqp.client.queue('qtag') 
    queue.get().addCallback(self._callback_reject_and_requeue_all).addErrback(self._errback) 
    
  • 停止5秒后消费者:
  • yield queue.close() 
    

    在这个阶段,该队列是仍然充满了50条消息,因为它们都被拒绝并重新排队(回调被多次触发)。

    再次
  • 启动消费者:
  • queue = yield amqp.client.queue('qtag') 
    queue.get().addCallback(self._callback).addErrback(self._errback) 
    
  • 停止后消费5秒
  • yield queue.close() 
    

    问题是,在步骤6中启动使用者后,从未触发回调,并且队列中仍保留着50条消息。

    注:

    • 消息被拒绝是这样的:
    yield amqp.chan.basic_reject(delivery_tag=message.delivery_tag, requeue = 1) 
    

    回答

    0

    为了干净地停止消费者(步骤5),basic_cancel必须使用:

    收率amqp.chan:

    5秒后
  • 停止消费者.basic_cancel(consumer_tag = 'qtag')

  • 开始再次消费者:

    收率amqp.chan.basic_consume(队列= “submit.sm_all”,NO_ACK =假,consumer_tag = 'qtag') 队列=产量amqp.client.queue( 'qtag') queue.get()的addCallback(self._callback).addErrback(self._errback)

  • 0

    有没有区别的消息是否已经被拒绝或没有 - 它会驻留在顶部队列,并且可以被任何消费者选择(或者如果使用TTL或长度限制将会从队列中移除,并且这样的限制将会达到)。

    您不能只消耗先前拒绝的消息,而无法在服务器端定义它。事实上,你只能从队列中消耗一个消息(它们是严格的FIFO队列)。

    作为一种解决方法,您可以设置Dead Letter Exchanges并拒绝带有requeue=false的消息,然后根据DLX路由流程将它们移动到目标队列。然后,您可以从那里消费被拒绝的消息,但通常情况下,通常会拒绝消息与消耗的队列相同,除非需要特殊的逻辑。

    你也可以重新发布你想要拒绝的地方,你想要的地方,它甚至听起来有点生疏。

    P.S:

    注意,当你在函数中调用yield,它不运行函数体而是返回发生器对象

    +0

    你的意思是拒绝的邮件会去当前队列?如果是这种情况,那么当我再次用相同的队列启动我的消费者时,为什么它看不到被拒绝的消息? – 2014-11-07 17:27:33

    +0

    我已经添加了关于yield语句的注释。使用'return'而不是'yield'呢?它应该有所帮助。 – pinepain 2014-11-07 20:24:54

    +0

    与yield没有关系,它只是阻塞,直到我从调用中得到响应,我用它来简化代码并且不使用回调(amqp对象是一个使用扭曲框架的txamqp客户端,基于事件的一个事件) – 2014-11-08 08:13:53