我是新来的spring amqp,我想创建两个不同行为的不同监听器。问题是我在编译时不知道队列名称,所以我不能使用this解决方案。java spring boot amqp具有不同行为的两个监听器
我想要做的事情是:从“sidechannel”队列中读取(然后删除)第一条消息,它应该看起来像这样{"queues":["queue1","queue2"]}
。
现在从队列1和队列2读取(然后删除)第一条消息。在此之后,请转到步骤1,阅读侧通道的第一条消息
我尝试使用不同的侦听器创建2个SimpleMessageListenerContainers,如下面的代码所示,但它不起作用,因为我认为它会起作用。
我的代码:
@SpringBootApplication
public class Main implements CommandLineRunner {
final static String queueName = "sidechannel";
@Autowired
AnnotationConfigApplicationContext context;
@Autowired
RabbitTemplate rabbitTemplate;
@Bean
Queue queue() {
return new Queue(queueName, false);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("spring-boot-exchange");
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(queueName);
}
@Bean
public ConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setQueueNames(queueName);
container.setMessageListener(sidechannelListener());
return container;
}
@Bean
public SimpleMessageListenerContainer messageListenerContainer2() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setQueueNames("queue1","queue2");
container.setMessageListener(queueListener());
return container;
}
@Bean
public MessageListener sidechannelListener() {
return message -> {
String msg = new String(message.getBody());
System.out.println(msg);
try {
Map<String, Object> map = jsonToMap(msg);
for (String name : (ArrayList<String>) map.get("queues")) {
System.out.println("Waiting for " + name + " message");
rabbitTemplate.receive(name);
}
} catch (IOException e) {
e.printStackTrace();
}
};
}
@Bean
public MessageListener queueListener() {
return message -> {
String msg = new String(message.getBody());
System.out.println("Received message: ");
System.out.println(msg);
};
}
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(Main.class, args);
}
@Override
public void run(String... args) throws Exception {
rabbitTemplate.setReceiveTimeout(-1);
while(true) {
System.out.println("Waiting for side channel message");
rabbitTemplate.receive(queueName);
}
// context.close();
}
}
首先,由于某种原因,在侧通道队列中的消息将不被处理后除去。 其次,当我期待像这样的输出:
Waiting for side channel message
{"queues":["queue1","queue2"]}
Waiting for queue1 message
Received message:
"message from queue1"
Waiting for queue2 message
"message from queue2"
Waiting for side channel message
那就算我对这些不同的队列中接收到的消息,什么都不会发生(因为rabbitTemplate.setReceiveTimeout(-1);
),但不知何故,它反应的是我收到的每封邮件...
另外,我不明白的是,如果我第一次将消息发送到侧通道,然后QUEUE1它是这样:
Waiting for side channel message
{"queues":["queue1","queue2"]}
Waiting for queue1 message
Received message:
"message from queue1"
,现在,如果我再派(一秒)消息到queue1,它打印出th电子邮件,然后Waiting for queue2 message
。
所以它需要两个信息来继续周期...我不知道我在做什么错。
对我来说看起来过于复杂,为什么不使用配置?在启动时指定队列名称(如果可以将它们放在交换机上,也可以将它们放入属性文件中)。 –
这可能是非常复杂的,正如我所说的,我是春季amqp的新手,我仍然无法理解它。但由于我不知道编译时的队列名称,我不能将它们添加到属性文件,或者我错了吗?无论如何,Gary的回答帮助我解决了这个问题,感谢您的回复:) –
您不需要在编译时了解它们。我建议阅读参考指南。你可以在启动时提供属性,属性可以在以后定义,你只需要一个固定的属性名称。 –