2012-03-27 76 views
2

正确配置我们拥有一个ActiveMQ/Camel配置,此配置以前一直使用消息队列和并发消费者。但是,我们现在正在介绍消息主题,并且发现由于并发消费者 - 主题中收到的消息被消耗了很多次。ActiveMQ:使用两个队列(并发使用者)和主题

此场景的正确配置是什么?即,我们希望多个并发消费者用于在队列上接收到的消息,但是只有单个消费者为在主题上接收到的消息而定义消费者。

这里的当前配置:

<amq:connectionFactory id="amqConnectionFactory" 
    useAsyncSend="true" brokerURL="${${ptl.Servername}.jms.cluster.uri}" 
    userName="${jms.username}" password="${jms.password}" sendTimeout="1000" 
    optimizeAcknowledge="true" disableTimeStampsByDefault="true"> 
</amq:connectionFactory> 

<bean id="cachingConnectionFactory" 
    class="org.springframework.jms.connection.CachingConnectionFactory"> 
    <property name="targetConnectionFactory" ref="amqConnectionFactory"></property> 
    <property name="cacheConsumers" value="true"></property> 
    <property name="cacheProducers" value="true"></property> 
    <property name="reconnectOnException" value="true"></property> 
    <property name="sessionCacheSize" value="${jms.sessioncachesize}"></property> 
</bean> 

<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration"> 
    <property name="connectionFactory" ref="cachingConnectionFactory" /> 
    <property name="transacted" value="false" /> 
    <property name="concurrentConsumers" value="${jms.concurrentConsumer}" /> 
    <property name="maxConcurrentConsumers" value="${jms.max.concurrentConsumer}" /> 
    <property name="preserveMessageQos" value="true" /> 
    <property name="timeToLive" value="${jms.timeToLive}" /> 
</bean> 

<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> 
    <property name="configuration" ref="jmsConfig" /> 
</bean> 

回答

3

你可以明确的设置concurrentConsumers/maxConcurrentConsumers到任何主题消费者的“1”。

from("activemq:topic:myTopic?concurrentConsumers=1&maxConcurrentConsumers=1")... 

可替代地,设置JmsConfiguration并发/ maxConcurrentConsumers属性为“1”,然后明确地根据需要启用队列并发消耗。

from("activemq:queue:myQueue?maxConcurrentConsumers=5")... 

还,您可以使用Virtual Topics来没有得到重复进行主题邮件的并发量(强烈建议在传统主题)

0

我结束了使用的解决方案是创建一个单独的jmsConfig/ActiveMQ的配置块。

总CONFIGRATION如下所示:

<!-- This is appropriate for consuming Queues, but not topics. For topics, use 
jmsTopicConfig/activemqTopics --> 
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration"> 
    <property name="connectionFactory" ref="cachingConnectionFactory" /> 
    <property name="transacted" value="false" /> 
    <property name="concurrentConsumers" value="${jms.concurrentConsumer}" /> 
    <property name="maxConcurrentConsumers" value="${jms.max.concurrentConsumer}" /> 
    <property name="preserveMessageQos" value="true" /> 
    <property name="timeToLive" value="${jms.timeToLive}" /> 
</bean> 

<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> 
    <property name="configuration" ref="jmsConfig" /> 
</bean> 

<!-- This config limits to a single concurrent consumer. This config is appropriate for 
consuming Topics, not Queues. --> 
<bean id="jmsTopicConfig" class="org.apache.camel.component.jms.JmsConfiguration"> 
    <property name="connectionFactory" ref="cachingConnectionFactory" /> 
    <property name="transacted" value="false" /> 
    <property name="concurrentConsumers" value="1" /> 
    <property name="maxConcurrentConsumers" value="1" /> 
    <property name="preserveMessageQos" value="true" /> 
    <property name="timeToLive" value="${jms.timeToLive}" /> 
</bean> 

<bean id="activemqTopics" class="org.apache.activemq.camel.component.ActiveMQComponent"> 
    <property name="configuration" ref="jmsTopicConfig" /> 
</bean> 

然后,在骆驼管道,消耗掉activemqTopics豆的话题,如下:

<camel:route id="myTopicResponder"> 
    <camel:from uri="activemqTopics:topic:stockQuotes?concurrentConsumers=1" /> 
    <camel:to uri="bean:stockQuoteResponder?method=saveStockQuote"/> 
</camel:route>