2013-03-23 93 views
0

我正在测试使用JMS请求/回复骆驼和ActiveMQ的例子。当骆驼为你创建监听器时,我可以得到这个例子。即。
骆驼JMS请求回复远程MessageListener问题

from("direct:entryPoint").inOut("jms:queue:A"); 

from("jms:queue:A"). 
    process(new Processor() { 

     @Override 
     public void process(Exchange exchange) throws Exception { 
      exchange.getIn().setBody("Hello World."); 

     } 
    }); 



我现在遇到的问题是,我不能让JMS请求/应答与存在骆驼的JVM之外的消息监听工作。连接超时等待答复。我确信MessageListener正在向replyTo队列发送回复,并且我还设置了correlationId。我在这里做错了什么?我已经搜索了几天,试图找出这没有运气。提前感谢您的帮助。

下面是我正在使用的路线,我也将下面的MessageListener逻辑也放在下面。

from("direct:entryPoint"). 
    inOut("jms:queue:B?concurrentConsumers=4&requestTimeout=240000"); 

的MessageListener的onMessage队列B:

@Override 
public void onMessage(Message message) { 

    String msg = null; 
    ObjectMapper mapper = new ObjectMapper(); 
    mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, true); 
    String jsonOutput = null; 

    try{ 
     msg = ((TextMessage) message).getText(); 

     //convert message payload to purchase order 
     PurchaseOrder order = mapper.readValue(msg, PurchaseOrder.class); 

     //Set the id to see if the request reply worked. 
     order.setOrderId(BigInteger.valueOf(111111111)); 


     if(message.getJMSReplyTo() != null){ 
      Map<String, Object> headers = new HashMap<String, Object>(); 
      headers.put("JMSCorrelationID", message.getJMSCorrelationID()); 
      headers.put("JMSReplyTo", message.getJMSReplyTo().toString()); 
      jsonOutput = mapper.writeValueAsString(order); 

      //Camel runs on the external jvm so leverage the producerTemplate. 
      producerTemplate. 
       sendBodyAndHeaders("jms:"+ message.getJMSReplyTo().toString(), 
       jsonOutput, headers); 
     } 
    } 
    catch(Exception e){ 
     logger.fatal(e.getMessage()); 

     try { 
      if(message.getJMSReplyTo() != null){ 
       Map<String, Object> headers = new HashMap<String, Object>(); 
       headers.put("JMSCorrelationID", message.getJMSCorrelationID()); 

       producerTemplate. 
       sendBodyAndHeaders("jms:"+ message.getJMSReplyTo().toString(), 
e.getMessage(), headers); 
      } 
     } catch (JMSException e1) { 
      logger.fatal(e1.getMessage()); 
     } catch (Exception e1) { 
      logger.fatal(e1.getMessage()); 
     } 
    } 
} 
+0

你的第一个例子对我来说不起作用,因为第一条路线没有从第二条路线得到身体。我不得不在处理器中进行这种更改: exchange.setOut(exchange.getIn()); exchange.getOut()。setBody(“Hello World!”); – BPS 2015-01-28 19:31:57

回答

1
sendBodyAndHeaders("jms:"+ message.getJMSReplyTo().toString(), 
      jsonOutput, headers); 

通常解析为jms:queue://someQueue这可能会误事。除非使用precausions,否则使用转换为字符串的javax.jms.Destination通常不是一个好主意。

可以使用骆驼头CamelJmsDestinationheaders.put("CamelJmsDestination",message.getJMSReplyTo());

如果成功与否,我不知道。通常,尝试在Web控制台上使用ActiveMQ(或使用JMX(jconsole)连接到ActiveMQ)来查看队列,并尝试找出谁正在读取什么队列以及消息在哪里结束。这真的很有帮助。

+0

谢谢!这是问题。添加CamelJmsDestination为我工作。 – kdye43 2013-03-24 02:10:02