2017-12-18 268 views
0

我有以下问题需要解决: 我想实现一个简单的使用RabbitMQ消息传递的延迟重试机制。我有一个基础设施,可以让我延迟传递信息。我可以有任何想要在运行时利用这种延迟重试机制的感兴趣的参与者。如何注册一个队列,并且它是RabbitMQ和Spring的独占使用者/侦听器?

参与者只想给我提供2个细节和消息: 1.队列名称,他们希望在延迟T秒后传递消息。 2.队列的消费者(比如消息的消费者。)

我试图做到以下几点:

private void startSeparateListener(final Object messageConsumer, 
            final Queue queue) { 
     SimpleMessageListenerContainer simpleMessageListenerContainer 
        = myCustomeSimpleMessageListenerFactory.create(); 
     simpleMessageListenerContainer.setRabbitAdmin(rabbitAdmin); 
     simpleMessageListenerContainer.setQueues(queue); 
     simpleMessageListenerContainer.setMessageListener(new MessageListenerAdapter(messageConsumer)); 
     simpleMessageListenerContainer.start(); 

    } 

请注意,队列已被创建并与rabbitadmin已注册并且对象使用者有一个名为handleMessage的方法来侦听队列。

这是在运行时为队列的消息使用者动态注册队列的正确方法吗?

注: 春天已经提供了类型SimpleMessageListenerContainer一样的豆,但会使用bean添加队列和消费者的动态会导致发言权Q1的无意识消费的问题,被称为另一个队列的接收器的一部分,说Q2,其内容类型可能与Q1相同?

我尝试了很多关于它的搜索,但无法获得任何具体的解释。如果它是一个重复的问题和任何天真的话,事先道歉。

回答

0

我不能编译你的问题,但我可以告诉你已经有了RabbitMQ和Sring AMQP supports的Dealyed Exchange解决方案。

我建议远离动态添加SimpleMessageListenerContainer:它不像它看起来那么简单。有没有像addQueueNames()一个选项:

/** 
* Add queue(s) to this container's list of queues. The existing consumers 
* will be cancelled after they have processed any pre-fetched messages and 
* new consumers will be created. The queue must exist to avoid problems when 
* restarting the consumers. 
* @param queueName The queue to add. 
*/ 
@Override 
public void addQueueNames(String... queueName) { 

所以,你可能会考虑不增加新的集装箱,但添加新的队列,以现有的一个。 amqp_consumerQueue的下游路由可能有助于区分来自不同队列的消息。

+0

尊敬的@ artem-bilan,感谢您的回复。我的道歉是我不能很好地解决问题。我知道延迟交换插件,但是我已经创建了自己的延迟基础结构,使用交换和队列以及适当定义的x-message-ttl和x-deadletter-exchange。这整个infra发送所有的消息到汇兑。我想创建一个队列,将它绑定到交换机上,并在队列中注册一个消费者,全部在运行时。因此,我创建一个新的SimpleMessageListenerContainer并使用它的问题。我希望我能更好地解释它。 –

相关问题