查看下面TestAggregator
的子类化AbstractAggregator的示例。
import org.mule.DefaultMuleEvent;
import org.mule.DefaultMuleMessage;
import org.mule.api.MuleContext;
import org.mule.api.MuleEvent;
import org.mule.api.store.ObjectStoreException;
import org.mule.api.transformer.TransformerException;
import org.mule.routing.AbstractAggregator;
import org.mule.routing.AggregationException;
import org.mule.routing.EventGroup;
import org.mule.routing.correlation.CollectionCorrelatorCallback;
import org.mule.routing.correlation.EventCorrelatorCallback;
import org.mule.util.concurrent.ThreadNameHelper;
import java.util.Iterator;
public class TestAggregator extends AbstractAggregator
{
@Override
protected EventCorrelatorCallback getCorrelatorCallback(MuleContext muleContext)
{
return new CollectionCorrelatorCallback(muleContext,false,storePrefix)
{
@Override
public MuleEvent aggregateEvents(EventGroup events) throws AggregationException
{
StringBuffer buffer = new StringBuffer(128);
try
{
for (Iterator<MuleEvent> iterator = events.iterator(); iterator.hasNext();)
{
MuleEvent event = iterator.next();
try
{
buffer.append(event.transformMessageToString());
}
catch (TransformerException e)
{
throw new AggregationException(events, null, e);
}
}
}
catch (ObjectStoreException e)
{
throw new AggregationException(events,null,e);
}
logger.debug("event payload is: " + buffer.toString());
return new DefaultMuleEvent(new DefaultMuleMessage(buffer.toString(), muleContext), events.getMessageCollectionEvent());
}
};
}
}
谢谢!我会试试这个。我应该重写CollectionCorrelatorCallback.shouldAggregateEvents()来提供自己的逻辑,还是建议创建自己的实现EventCorrelatorCallback的类? – awynne
你宁愿实现你自己的EventCorrelatorCallback,因为你根本不依赖消息计数(我理解为:你不会依赖event.getMessage()。getCorrelationGroupSize()来确定要聚合的事件的数量)。 –
我发现EventCorrelatorCallback是由EventCorrelator调用的,它假定按照关联ID分组事件。所以我认为我需要通过“人为地”将它设置在传入消息的某处来使用correlationId。其他选择是维护我自己的数据结构来保存事件。有没有更好的办法? – awynne