2016-04-25 45 views
1

我必须使用Spring解码AMQP消息。为了处理它,我现在使用:使用Spring将AMQP消息解码为地图

// Configure queue. 
    RabbitAdmin admin = new RabbitAdmin(cf); 
    Queue queue = new Queue(queueName); 
    admin.declareQueue(queue); 
    FanoutExchange exchange = new FanoutExchange(exchangeName); 
    admin.declareExchange(exchange); 
    admin.declareBinding(BindingBuilder.bind(queue).to(exchange)); 

    // set up the listener and container 
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf); 

    MessageListenerAdapter adapter = new MessageListenerAdapter(listener); 
    container.setMessageListener(adapter); 
    container.setQueueNames(queueName); 
    container.start(); 

我的听众是

public class DataListener { 

    public void handleMessage(Object incomingMessage) { 
     LOGGER.error("AMQP: got message.{}", incomingMessage); 
    } 

} 

使用AmqpTemplate的convertAndSend方法发送的消息。没有配置给AmqpTemplate,一切都是默认的。

我怎么可能收到我的incomingMessage作为字段的HashMap?我不想强烈地将它耦合到特定的对象类型。

回答

3

假设你的意思是你的消息是一个POJO豆...

使用JSON - 在出站端使用Jackson2JsonMessageConverter,而不是默认SimpleMessageConverter,它使用Java序列化。

在接收端,相同的JSON转换器将尝试将传入流转换为原始POJO。

为了避免这种情况,请配置JSON消息转换器以将类名映射到HashMap而不是原始POJO。

您可以通过为转换器提供自定义DefaultJackson2JavaTypeMapper来完成此操作,该配置为将类名称从__TypeId__标头映射到java.util.HashMap

编辑

或者你可以简单地注入ClassMapper总是返回HashMap - 这里有一个快速启动应用程序,我写来说明该技术:

@SpringBootApplication 
public class So36837736Application { 

    public static void main(String[] args) throws Exception { 
     ConfigurableApplicationContext context = SpringApplication.run(So36837736Application.class, args); 
     context.getBean(RabbitTemplate.class).convertAndSend(new Foo("bar")); 
     Thread.sleep(10000); 
     context.close(); 
    } 

    @Bean 
    public RabbitTemplate template(ConnectionFactory connectionFactory) { 
     RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); 
     rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); 
     rabbitTemplate.setRoutingKey(queue().getName()); 
     return rabbitTemplate; 
    } 

    @Bean 
    public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) { 
     SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); 
     container.setQueues(queue()); 
     MessageListenerAdapter adapter = new MessageListenerAdapter(new Object() { 

      @SuppressWarnings("unused") 
      public void handleMessage(Map<String, Object> map) { 
       System.out.println("\n\n\n" + map + "\n\n\n"); 
      } 

     }); 
     Jackson2JsonMessageConverter messageConverter = new Jackson2JsonMessageConverter(); 
     ClassMapper classMapper = new ClassMapper() { 

      @Override 
      public void fromClass(Class<?> clazz, MessageProperties properties) { 
      } 

      @Override 
      public Class<?> toClass(MessageProperties properties) { 
       return HashMap.class; 
      } 

     }; 
     messageConverter.setClassMapper(classMapper); 
     adapter.setMessageConverter(messageConverter); 
     container.setMessageListener(adapter); 
     return container; 
    } 

    @Bean 
    public Queue queue() { 
     return new AnonymousQueue(); 
    } 

    public static class Foo { 

     private final String bar; 

     private Foo(String bar) { 
      this.bar = bar; 
     } 

     public String getBar() { 
      return this.bar; 
     } 

    } 

}