2016-05-17 67 views
2

我是新来的spring amqp,我想创建两个不同行为的不同监听器。问题是我在编译时不知道队列名称,所以我不能使用this解决方案。java spring boot amqp具有不同行为的两个监听器

我想要做的事情是:从“sidechannel”队列中读取(然后删除)第一条消息,它应该看起来像这样{"queues":["queue1","queue2"]}

现在从队列1和队列2读取(然后删除)第一条消息。在此之后,请转到步骤1,阅读侧通道的第一条消息

我尝试使用不同的侦听器创建2个SimpleMessageListenerContainers,如下面的代码所示,但它不起作用,因为我认为它会起作用。

我的代码:

@SpringBootApplication 
public class Main implements CommandLineRunner { 

final static String queueName = "sidechannel"; 

@Autowired 
AnnotationConfigApplicationContext context; 

@Autowired 
RabbitTemplate rabbitTemplate; 

@Bean 
Queue queue() { 
    return new Queue(queueName, false); 
} 

@Bean 
TopicExchange exchange() { 
    return new TopicExchange("spring-boot-exchange"); 
} 

@Bean 
Binding binding(Queue queue, TopicExchange exchange) { 
    return BindingBuilder.bind(queue).to(exchange).with(queueName); 
} 

@Bean 
public ConnectionFactory rabbitConnectionFactory() { 
    CachingConnectionFactory connectionFactory = 
      new CachingConnectionFactory("localhost"); 
    connectionFactory.setUsername("guest"); 
    connectionFactory.setPassword("guest"); 
    return connectionFactory; 
} 

@Bean 
public SimpleMessageListenerContainer messageListenerContainer() { 
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); 
    container.setConnectionFactory(rabbitConnectionFactory()); 
    container.setQueueNames(queueName); 
    container.setMessageListener(sidechannelListener()); 
    return container; 
} 

@Bean 
public SimpleMessageListenerContainer messageListenerContainer2() { 
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); 
    container.setConnectionFactory(rabbitConnectionFactory()); 
    container.setQueueNames("queue1","queue2"); 
    container.setMessageListener(queueListener()); 
    return container; 
} 



@Bean 
public MessageListener sidechannelListener() { 
    return message -> { 
     String msg = new String(message.getBody()); 
     System.out.println(msg); 
     try { 
      Map<String, Object> map = jsonToMap(msg); 
      for (String name : (ArrayList<String>) map.get("queues")) { 
       System.out.println("Waiting for " + name + " message"); 
       rabbitTemplate.receive(name); 
      } 
     } catch (IOException e) { 
      e.printStackTrace(); 
     } 

    }; 
} 

@Bean 
public MessageListener queueListener() { 
    return message -> { 
     String msg = new String(message.getBody()); 
     System.out.println("Received message: "); 
     System.out.println(msg); 
    }; 
} 


public static void main(String[] args) throws InterruptedException { 
    SpringApplication.run(Main.class, args); 
} 

@Override 
public void run(String... args) throws Exception { 
    rabbitTemplate.setReceiveTimeout(-1); 

    while(true) { 
     System.out.println("Waiting for side channel message"); 
     rabbitTemplate.receive(queueName); 
    } 
//  context.close(); 
} 
} 

首先,由于某种原因,在侧通道队列中的消息将不被处理后除去。 其次,当我期待像这样的输出:

Waiting for side channel message 
{"queues":["queue1","queue2"]} 
Waiting for queue1 message 
Received message: 
"message from queue1" 
Waiting for queue2 message 
"message from queue2" 
Waiting for side channel message 

那就算我对这些不同的队列中接收到的消息,什么都不会发生(因为rabbitTemplate.setReceiveTimeout(-1);),但不知何故,它反应的是我收到的每封邮件...

另外,我不明白的是,如果我第一次将消息发送到侧通道,然后QUEUE1它是这样:

Waiting for side channel message 
{"queues":["queue1","queue2"]} 
Waiting for queue1 message 
Received message: 
"message from queue1" 

,现在,如果我再派(一秒)消息到queue1,它打印出th电子邮件,然后Waiting for queue2 message

所以它需要两个信息来继续周期...我不知道我在做什么错。

+0

对我来说看起来过于复杂,为什么不使用配置?在启动时指定队列名称(如果可以将它们放在交换机上,也可以将它们放入属性文件中)。 –

+0

这可能是非常复杂的,正如我所说的,我是春季amqp的新手,我仍然无法理解它。但由于我不知道编译时的队列名称,我不能将它们添加到属性文件,或者我错了吗?无论如何,Gary的回答帮助我解决了这个问题,感谢您的回复:) –

+0

您不需要在编译时了解它们。我建议阅读参考指南。你可以在启动时提供属性,属性可以在以后定义,你只需要一个固定的属性名称。 –

回答

0

你似乎在混合范例;您有侦听器容器,它们是由消息驱动的,并且您还在使用轮询(template.receive())。一般地,容器队列1,队列2将已经处理了来自这些队列中的消息和这个

  System.out.println("Waiting for " + name + " message"); 
      rabbitTemplate.receive(name); 

将永远阻止如果超时是< 0;因此原始消息将永远不会被删除。

+0

你是对的,我不知道听者容器是消息驱动的,不知怎的解释它,我可以解决它,非常感谢你! –