而不是停止路线,我会建议使用Throttler EIP。
from("jms:queue:inbox")
.throttle(7000)
.timePeriodMillis(1000*60*60)
.to("log:result", "mock:result");
上面的示例将节流消息上接收到的jms:queue:inbox
被发送到mock:result
确保最大7000个的消息,在任何1小时的窗口被发送之前。
或者,更细粒度的控制,你可以定义一个限制路由策略如图骆驼的throttling example:
<route routePolicyRef="myPolicy">
<from uri="jms:queue:inbox"/>
<transacted/>
<to uri="log:+++JMS +++?groupSize=100"/>
<to ref="foo"/>
</route>
节流警察的定义如下:
<bean id="myPolicy" class="org.apache.camel.impl.ThrottlingInflightRoutePolicy">
<property name="scope" value="Context"/>
<!-- when we hit > 20 inflight exchanges then kick in and suspend the routes -->
<property name="maxInflightExchanges" value="20"/>
<!-- when we hit lower than 10% of the max = 2 then kick in and resume the routes the default percentage is 70% but in this demo we want a low value -->
<property name="resumePercentOfMax" value="10"/>
<!-- output throttling activity at WARN level -->
<property name="loggingLevel" value="WARN"/>
</bean>
编辑1:
如果您需要全局节流,那么您可以先让一个消费者读取消息,如上所述限制所有消息,然后将它们重新发送到另一个队列,并让它们重新读取并处理它们的分布式消费者。
编辑2:
或者,您也可以实现自己的ThrottlingInflightRoutePolicy
访问中央数据库保存处理信息。这样,你不需要一个“单节点主节气门”。但是,数据库也可能是单点故障。
客户的路线政策将是我会这样做的方式。当你说“按照我上面选择的暂停方法”时,我认为你的路由策略只是叫做stopConsumer()和startConsumer(),就像ThrottlingInflightRoutePolicy一样。 – 2014-10-10 09:11:04
“throller”会有帮助吗? http://camel.apache.org/throttler.html – vikingsteve 2014-10-10 09:43:17
@vikingsteve我需要在所有正在处理该队列的机器上的队列级别进行节流。您提到的节流只能用于限制单台机器的处理。 – Denise 2014-10-10 10:50:07