2015-05-14 127 views
0

我正在通过骆驼路由发送每秒1000个消息到ActiveMQ队列。骆驼上下文如下:ActiveMQ消息的快速消耗

<beans xmlns="http://www.springframework.org/schema/beans" 
     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
     xmlns:camel="http://camel.apache.org/schema/spring" 
     xsi:schemaLocation=" 
     http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd 
     http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd 
     http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> 


<camelContext id="MsgServlet" xmlns="http://camel.apache.org/schema/spring"> 


    <route id="ConfigComponent"> 
      <from uri="timer:foo?period=25s"/> 
      <to uri="http://10.53.138.245:8080/demo/DemoServlet?component=1"/> 


      <to uri="bean:InitConfig?method=process"/> 

      </route> 


     <route id="servletToProcessor"> 
      <from uri="jetty:http://10.53.138.100:10666/mytestservice"/> 
       <unmarshal> 
       <json library="Jackson"/> 
      </unmarshal> 
      <to uri="bean:MsgProcessor?method=process"/> 
      <to uri="activemq:queue:inbox" pattern="InOnly"/> 
      </route> 

     <route id="inToOutRoute"> 
      <from uri="activemq:queue:inbox"/> 
      <to uri="bean:ESPProcessor?method=process"/> 
     </route> 



    </camelContext> 

    <bean id="InitConfig" class="org.sap.camel.iot.example.InitConfig"> 
    </bean> 


    <bean id="MsgProcessor" class="org.sap.camel.iot.example.MsgProcessor"> 
    </bean> 

     <bean id="ESPProcessor" class="org.sap.camel.iot.example.ESPProcessor"> 
    </bean> 

     <bean id="MMSProcessor" class="org.sap.camel.iot.example.MMSProcessor"> 
    </bean> 

    <broker id="broker" brokerName="myBroker" useShutdownHook="false" useJmx="true" 
        persistent="true" dataDirectory="activemq-data" 
        xmlns="http://activemq.apache.org/schema/core"> 

     <transportConnectors> 
      <!-- vm transport for intra-jvm communication --> 
      <transportConnector name="vm" uri="vm://myBroker"/> 
      <!-- tcp for external communication --> 
      <transportConnector name="tcp" uri="tcp://0.0.0.0:61616"/> 
     </transportConnectors> 

     <destinationPolicy> 
    <policyMap> 
     <policyEntries> 
     <policyEntry queue="inbox" maxPageSize="1000" memoryLimit="100MB" queuePrefetch="2000"/> 
     </policyEntries> 
    </policyMap> 
    </destinationPolicy> 

    </broker> 




    <bean class="org.apache.activemq.camel.component.ActiveMQComponent" id="amq"> 
    <property name="connectionFactory" ref="pooledCF" /> 

</bean> 

<bean class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop" id="pooledCF" init-method="start"> 
    <property name="connectionFactory" ref="AMQCF" /> 
    <property name="maxConnections" value="2" /> 
</bean> 

<bean class="org.apache.activemq.ActiveMQConnectionFactory" id="AMQCF"> 
    <property name="brokerURL" value="vm://myBroker?create=false&amp;waitForStart=5000"/> 
     <property name="userName" value="karaf"/> 
     <property name="password" value="karaf"/> 
     <property name="copyMessageOnSend" value="true" /> 
     <property name="useAsyncSend" value="true" /> 

     <property name="prefetchPolicy.queuePrefetch" value="2000"/> 
</bean> 

</beans> 

问题我面临的是,它需要很多时间来清空队列。我需要做什么配置更改来最早清理队列?

问候, MAYUR

回答

0

这可以通过增加concurrentConsumers的数量即水平或通过使用线程模型。骆驼文档表明,为并发消费者提供更快的选项。

这是一个很好的讨论主题,描述了这个:Apache Camel concurrentConsumers vs threads