2014-10-09 130 views
4

我的基本问题是,我只能在任何1小时的时间内从我的一个队列(跨所有机器)处理7000条消息。我没有看到用骆驼或activemq做到这一点,所以我采取了自己的路线停止/启动逻辑。我看到了很多方法来做到这一点,我尝试了其中的一些方法(只是遇到问题)。用activemq“骆驼”暂停路线的正确方法是什么?

  1. camelContext.stopRoute(route):这工作,因为信息停止正在处理,但是当我打电话camelContext.startRoute(route),它泄漏TCP连接,最终导致的ActiveMQ服务器打了极限而死亡。
  2. camelContext.suspendRoute(route):这也会停止正在处理的邮件,并且不会泄漏连接,但它似乎会杀死当我拨打camelContext.resumeRoute(route)时不会重新激活的活动使用者(在管理面板中可见)。我认为,即使我恢复,最终可能导致没有消息从队列中被处理掉。
  3. 实施自定义RoutePolicy。公平地说,我还没有尝试过,但似乎它会成为我根据我上面选择的暂停方法而遇到的同样问题的牺牲品。

有没有解决这个问题的方法,我还没有遇到过?

+0

客户的路线政策将是我会这样做的方式。当你说“按照我上面选择的暂停方法”时,我认为你的路由策略只是叫做stopConsumer()和startConsumer(),就像ThrottlingInflightRoutePolicy一样。 – 2014-10-10 09:11:04

+0

“throller”会有帮助吗? http://camel.apache.org/throttler.html – vikingsteve 2014-10-10 09:43:17

+0

@vikingsteve我需要在所有正在处理该队列的机器上的队列级别进行节流。您提到的节流只能用于限制单台机器的处理。 – Denise 2014-10-10 10:50:07

回答

3

而不是停止路线,我会建议使用Throttler EIP

from("jms:queue:inbox") 
    .throttle(7000) 
    .timePeriodMillis(1000*60*60) 
    .to("log:result", "mock:result"); 

上面的示例将节流消息上接收到的jms:queue:inbox被发送到mock:result确保最大7000个的消息,在任何1小时的窗口被发送之前。

或者,更细粒度的控制,你可以定义一个限制路由策略如图骆驼的throttling example

<route routePolicyRef="myPolicy"> 
    <from uri="jms:queue:inbox"/> 
    <transacted/> 
    <to uri="log:+++JMS +++?groupSize=100"/> 
    <to ref="foo"/> 
</route> 

节流警察的定义如下:

<bean id="myPolicy" class="org.apache.camel.impl.ThrottlingInflightRoutePolicy"> 
    <property name="scope" value="Context"/> 
    <!-- when we hit > 20 inflight exchanges then kick in and suspend the routes --> 
    <property name="maxInflightExchanges" value="20"/> 
    <!-- when we hit lower than 10% of the max = 2 then kick in and resume the routes the default percentage is 70% but in this demo we want a low value --> 
    <property name="resumePercentOfMax" value="10"/> 
    <!-- output throttling activity at WARN level --> 
    <property name="loggingLevel" value="WARN"/> 
</bean> 

编辑1:

如果您需要全局节流,那么您可以先让一个消费者读取消息,如上所述限制所有消息,然后将它们重新发送到另一个队列,并让它们重新读取并处理它们的分布式消费者。

编辑2:

或者,您也可以实现自己的ThrottlingInflightRoutePolicy访问中央数据库保存处理信息。这样,你不需要一个“单节点主节气门”。但是,数据库也可能是单点故障。

+0

限制策略是否可以跨上下文/机器应用?否则,可能会出现单点故障。 – vikingsteve 2014-10-10 11:53:58

+0

@vikingsteve感谢您的问题。看我的*编辑2 *如何处理。 – 2014-10-10 12:15:50

+0

@彼得有趣。我没有考虑让其中一位消费者进行限制,但是这会引发很多管理消费者不对称的开销。 我不知道ThrottlingInflightRoutePolicy如何实现与我在问题中提到的camelContext.startRoute()或camelContext.resumeRoute()不同的停止/启动路由,但是由于每个人都强烈支持它,所以我会给它一个镜头! – Denise 2014-10-10 13:51:54

0

彼得的问题得到了最好的答案,但我最终的结果是延长了ThrottlingInflightRoutePolicy,关于它是如何工作的没有很好的解释,所以我想我会对这个问题进行一些注释并说明我是如何解决问题的。

public class MyRoutePolicy extends RoutePolicySupport implements CamelContextAware { 

    private CamelContext camelContext; 
    private final Lock lock = new ReentrantLock(); 
    private ContextScopedEventNotifier eventNotifier; 

    @Override 
    public final void setCamelContext(final CamelContext camelContext) { 
     this.camelContext = camelContext; 
    } 

    @Override 
    public final CamelContext getCamelContext() { 
     return this.camelContext; 
    } 

    @Override 
    public final void onExchangeDone(final Route route, final Exchange exchange) { 
     throttle(route); 
    } 

    private void throttle(final Route route) { 
     // this works the best when this logic is executed when the exchange is done 
     Consumer consumer = route.getConsumer(); 

     boolean stop = isRouteMarkedForSuspension(route.getId()) && ((JmsConsumer) route.getConsumer()).isStarted(); 
     if (stop) { 
      try { 
       lock.lock(); 
       stopConsumer(consumer); 
      } catch (Exception e) { 
       handleException(e); 
      } finally { 
       lock.unlock(); 
      } 
     } 

     // reload size in case a race condition with too many at once being invoked 
     // so we need to ensure that we read the most current size and start the consumer if we are already to low 
     boolean start = !isRouteMarkedForSuspension(route.getId()) && ((JmsConsumer) route.getConsumer()).isSuspended(); 
     if (start) { 
      try { 
       lock.lock(); 
       startConsumer(consumer); 
      } catch (Exception e) { 
       handleException(e); 
      } finally { 
       lock.unlock(); 
      } 
     } 
    } 

    @Override 
    protected final void doStart() throws Exception { 
     ObjectHelper.notNull(camelContext, "CamelContext", this); 
     eventNotifier = new ContextScopedEventNotifier(); 
     // must start the notifier before it can be used 
     ServiceHelper.startService(eventNotifier); 
     // we are in context scope, so we need to use an event notifier to keep track 
     // when any exchanges is done on the camel context. 
     // This ensures we can trigger accordingly to context scope 
     camelContext.getManagementStrategy().addEventNotifier(eventNotifier); 
    } 

    @Override 
    protected final void doStop() throws Exception { 
     ObjectHelper.notNull(camelContext, "CamelContext", this); 
     camelContext.getManagementStrategy().removeEventNotifier(eventNotifier); 
    } 

    private class ContextScopedEventNotifier extends EventNotifierSupport { 

     @Override 
     public void notify(final EventObject event) throws Exception { 
      for (Route route : camelContext.getRoutes()) { 
       throttle(route); 
      } 
     } 

     @Override 
     public boolean isEnabled(final EventObject event) { 
      return event instanceof ExchangeCompletedEvent; 
     } 

     @Override 
     protected void doStart() throws Exception { 
      // noop 
     } 

     @Override 
     protected void doStop() throws Exception { 
      // noop 
     } 

     @Override 
     public String toString() { 
      return "ContextScopedEventNotifier"; 
     } 
    } 
} 

所以我加入了RoutePolicy上述所有我的路线,像这样:

from(uri).routePolicy(routePolicy).process(runner); 

MyRoutePolicy是一个内部类和isRouteMarkedForSuspension在主类中定义。

throttle被击中在两个点:被处理后

  • 交换(消息)。这对于确定消费者是否应该暂停很有用。
  • 通过ContextScopedEventNotifier通知事件。这对于确定消费者是否应该恢复很有用。