2013-04-11 57 views
0

也许这是一个愚蠢的问题,我错过了一些非常简单的事情,但我很困难。 我使用ActiveMQ的5.5.1和2.1.4 SIspring-jms OutboundGateway不接听回复

我CONFIGS片段:

--- SERVER ---

<beans:bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 
    <beans:property name="brokerURL" value="tcp://localhost:61616" /> 
</beans:bean> 


    <beans:bean id="listQueue" class="org.apache.activemq.command.ActiveMQQueue"> 
     <beans:constructor-arg name="name" value="LIST_QUEUE"/> 
    </beans:bean> 

    <beans:bean id="replyListQueue" class="org.apache.activemq.command.ActiveMQQueue"> 
     <beans:constructor-arg name="name" value="REPLY_LIST_QUEUE"/> 
    </beans:bean>    

    <channel id="replyListChannel"/> 
    <channel id="listIn" /> 
    <channel id="listDriver"/> 
    <channel id="listStock"/>  


    <jms:inbound-channel-adapter id="listInJms" 
     connection-factory="connectionFactory" 
     destination="listQueue" 
     channel="listIn"   
     auto-startup="true"> 
     <poller fixed-rate="3000"/> 
    </jms:inbound-channel-adapter> 



    <header-value-router input-channel="listIn" header-name="List" 
      default-output-channel="nullChannel">   
     <mapping value="Driver" channel="listDriver" /> 
     <mapping value="Stock" channel="listStock" /> 
    </header-value-router> 

    <jms:outbound-channel-adapter connection-factory="connectionFactory" 
     channel="replyListChannel" 
     destination="replyListQueue" 
     auto-startup="true">     
    </jms:outbound-channel-adapter> 

--- CLIENT ---

<beans:bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 
    <beans:property name="brokerURL" value="tcp://localhost:61616" /> 
</beans:bean>  

<beans:bean id="requestListQueue" class="org.apache.activemq.command.ActiveMQQueue"> 
    <beans:constructor-arg value="LIST_QUEUE"/> 
</beans:bean>   

<beans:bean id="replyListQueue" class="org.apache.activemq.command.ActiveMQQueue"> 
    <beans:constructor-arg value="REPLY_LIST_QUEUE"/> 
</beans:bean> 


<channel id="requestListChannel"> 
    <queue capacity="20"/> 
</channel> 

<channel id="listStockChannel"> 
    <queue capacity="20"/> 
</channel> 

<channel id="listDriverChannel"> 
    <queue capacity="20"/> 
</channel>  

<channel id="replyListChannel"/>     

<jms:outbound-gateway id="outListGW" 
    connection-factory="connectionFactory" 
    request-destination="requestListQueue" 
    request-channel="requestListChannel" 
    reply-destination="replyListQueue" 
    reply-channel="replyListChannel" 
    reply-timeout="20000" 
    receive-timeout="20000"> 
    <poller fixed-rate="5000" /> 
</jms:outbound-gateway>  


<header-value-router input-channel="replyListChannel" header-name="List" 
     default-output-channel="nullChannel">   
    <mapping value="Driver" channel="listDriverChannel" /> 
    <mapping value="Stock" channel="listStockChannel" /> 
</header-value-router> 

然后在代码中的一些地方,我手动执行请求和监听应答信道:

public static DriverList requestDriverList() { 

    Message<String> ldrm = MessageBuilder.withPayload("DriverList request"). 
      setHeader("List", "Driver").build();    
    try { 
     ApplicationContext ctx = 
     new ClassPathXmlApplicationContext("classpath:dmclnt/config/integ-context.xml"); 
     MessageChannel requestListChannel = 
       ctx.getBean("requestListChannel", MessageChannel.class); 
     QueueChannel listDriverChannel = 
       ctx.getBean("listDriverChannel", QueueChannel.class); 


     logger.info("Request for DriverList is sent to channel"); 

     Message dlm = listDriverChannel.receive(20000); 

     String xmlDL = (String)dlm.getPayload(); 

     JAXBContext jaxbctx = JAXBContext.newInstance(DriverList.class); 
     DriverList dl = (DriverList)jaxbctx.createUnmarshaller(). 
     unmarshal(newStringReader(xmlDL)); 
     logger.info("DriverList objct unmarshalled: "+dl.toString()); 
     return dl;    
    } catch (JAXBException e) { 
     logger.error("Error converting xmlDriverList to DriverList object",e); 
     return null; 
    } catch (RuntimeException e){ 
     logger.error(e); 
     return null; 
    } 
} 

但我收到

"MessageTimeoutException: failed to receive JMS response within timeout of: 20000ms" 

所有的时间。 当我查看服务器日志时,我发现具有正确有效负载的答复已成功从服务器发送到客户端,而且, 答复被置于REPLY_LIST_QUEUE中,正如我在ActiveMQ管理控制台中看到的。 没有更多的事情发生! REPLY_LIST_QUEUE中的消息With_Correct_Payload驻留在此队列中的Pending和Enqueued状态中。 没有消息出队。 看起来,尽管receive-timeout =“20000ms”的延迟远远足以得到答复,但JmsOutboundGateway不会接收来自回复目标队列的消息。 我在做什么错?

回答

1

这是因为出站网关对消息关联有一些期望。我认为,使用您的配置,网关希望服务器在关联ID中返回入站消息ID。它使用消息选择器来接收其答复。

如果您在服务器上使用入站网关,它将为您处理关联。

是否有任何特殊原因选择在服务器上使用离散通道适配器而不是入站网关?

您可能需要考虑升级到2.2.3,其中对出站网关进行了一些改进(但仍需要服务器进行适当的关联)。

编辑:每低于您的评论...

如果你想使用一对适配器代替,与当前的配置,你将不得不使用一个<header-enricher/>jms_messageid复制入站头jms_correlationId

或者,在客户端(出站网关),将correlation-key属性设置为JmsCorrelationId。这将导致网关在出站消息中填充该标头,并且在服务器端不需要任何东西。

+0

谢谢,加里,您的回复 – user1676178 2013-04-23 13:27:03

+0

谢谢,加里,您的回复,我很遗憾延迟评论 - 我一直在旅途中。事实上,事实证明,没有什么特别的不使用入站网关的原因。但我一直在想这是。:)。所以,对于出站和入站网关,一切工作都正常。但是,如果我真的有理由在服务器上创建两个单独的通道适配器(入站和出站)呢?我可能在服务器上建立什么样的关联?使用setCorrelationId,例如?AFAIK,使用服务激活器(即使用SI消息,并且我必须使用服务激活器)我无法获得JMS头文件? – user1676178 2013-04-23 13:46:06

+0

我更新了我的答案。 – 2013-04-23 14:55:26