使用txamqp客户端如下具体情况:如何重启消费和消费被拒绝的消息
- 声明所谓的 '消息' (类型=话题)
交换yield amqp.chan.exchange_declare(exchange='messaging', type='topic')
- 设置消费者
yield amqp.named_queue_declare(queue="submit.sm_all") yield amqp.chan.queue_bind(queue="submit.sm_all", exchange="messaging", routing_key="submit.sm.*") yield amqp.chan.basic_consume(queue="submit.sm_all", no_ack=False, consumer_tag='qtag')
- 发布50消息
- 启动消费者用回调重新消耗所有消费的消息
- 停止5秒后消费者:
- 启动消费者:
- 停止后消费5秒:
- 消息被拒绝是这样的:
for i in range(50): yield amqp.publish(exchange='messaging', routing_key="submit.sm.connector01", content=Content(str(i)))
queue = yield amqp.client.queue('qtag') queue.get().addCallback(self._callback_reject_and_requeue_all).addErrback(self._errback)
yield queue.close()
在这个阶段,该队列是仍然充满了50条消息,因为它们都被拒绝并重新排队(回调被多次触发)。
再次queue = yield amqp.client.queue('qtag') queue.get().addCallback(self._callback).addErrback(self._errback)
yield queue.close()
问题是,在步骤6中启动使用者后,从未触发回调,并且队列中仍保留着50条消息。
注:
yield amqp.chan.basic_reject(delivery_tag=message.delivery_tag, requeue = 1)
你的意思是拒绝的邮件会去当前队列?如果是这种情况,那么当我再次用相同的队列启动我的消费者时,为什么它看不到被拒绝的消息? – 2014-11-07 17:27:33
我已经添加了关于yield语句的注释。使用'return'而不是'yield'呢?它应该有所帮助。 – pinepain 2014-11-07 20:24:54
与yield没有关系,它只是阻塞,直到我从调用中得到响应,我用它来简化代码并且不使用回调(amqp对象是一个使用扭曲框架的txamqp客户端,基于事件的一个事件) – 2014-11-08 08:13:53