2011-10-01 67 views
1

在mule 3.x中创建完全自定义聚合器的建议方法是什么?通过完全自定义的,我的意思是按照我自己的逻辑,不使用相关标识,消息计数等如何在Mule中创建自定义聚合器?

在mulesoft网站上的文件已经过时,说要使用AbstractEventAggregator不中3.X存在:

http://www.mulesoft.org/documentation/display/MULE3USER/Message+Splitting+and+Aggregatio

进一步挖掘,它看起来像这类已3.x中被重命名为AbstractAggregator:

http://www.mulesoft.org/docs/site/3.2.0/apidocs/org/mule/routing/AbstractAggregator.html

但是,有没有显示如何使用这个的例子。上面第一个链接中描述的LoanBroker示例实际上使用了一个关联聚合器(在2.x示例中,我认为这是文档所指的内容)。

有一次,有一个抽象类有抽象方法shouldAggregate和doAggregate。这是我想要扩展的类。

回答

4

查看下面TestAggregator的子类化AbstractAggregator的示例。

import org.mule.DefaultMuleEvent; 
import org.mule.DefaultMuleMessage; 
import org.mule.api.MuleContext; 
import org.mule.api.MuleEvent; 
import org.mule.api.store.ObjectStoreException; 
import org.mule.api.transformer.TransformerException; 
import org.mule.routing.AbstractAggregator; 
import org.mule.routing.AggregationException; 
import org.mule.routing.EventGroup; 
import org.mule.routing.correlation.CollectionCorrelatorCallback; 
import org.mule.routing.correlation.EventCorrelatorCallback; 
import org.mule.util.concurrent.ThreadNameHelper; 

import java.util.Iterator; 

public class TestAggregator extends AbstractAggregator 
{ 
    @Override 
    protected EventCorrelatorCallback getCorrelatorCallback(MuleContext muleContext) 
    { 
     return new CollectionCorrelatorCallback(muleContext,false,storePrefix) 
     { 
      @Override 
      public MuleEvent aggregateEvents(EventGroup events) throws AggregationException 
      { 
       StringBuffer buffer = new StringBuffer(128); 

       try 
       { 
        for (Iterator<MuleEvent> iterator = events.iterator(); iterator.hasNext();) 
        { 
         MuleEvent event = iterator.next(); 
         try 
         { 
          buffer.append(event.transformMessageToString()); 
         } 
         catch (TransformerException e) 
         { 
          throw new AggregationException(events, null, e); 
         } 
        } 
       } 
       catch (ObjectStoreException e) 
       { 
        throw new AggregationException(events,null,e); 
       } 

       logger.debug("event payload is: " + buffer.toString()); 
       return new DefaultMuleEvent(new DefaultMuleMessage(buffer.toString(), muleContext), events.getMessageCollectionEvent()); 
      } 
     }; 
    } 
} 
+0

谢谢!我会试试这个。我应该重写CollectionCorrelatorCallback.shouldAggregateEvents()来提供自己的逻辑,还是建议创建自己的实现EventCorrelatorCallback的类? – awynne

+0

你宁愿实现你自己的EventCorrelatorCallback,因为你根本不依赖消息计数(我理解为:你不会依赖event.getMessage()。getCorrelationGroupSize()来确定要聚合的事件的数量)。 –

+0

我发现EventCorrelatorCallback是由EventCorrelator调用的,它假定按照关联ID分组事件。所以我认为我需要通过“人为地”将它设置在传入消息的某处来使用correlationId。其他选择是维护我自己的数据结构来保存事件。有没有更好的办法? – awynne