在描述的场景,我会派劈裂消息的路由与聚合器(姑且称之为“AggregationRoute”),该公司聚集战略实现PreCompletionAwareAggregationStrategy(你已经在使用它的方式,我猜)。 然后,当分割结束时,将AGGREGATION_COMPLETE_ALL_GROUPS标头设置为true并将其发送到AggregationRoute。此交换仅用作完成所有聚合组的信号。
实施例:
...
.split(body()).streaming()
.to("direct:aggregationRoute")
.end()
.setHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS,constant(true))
.to("direct:aggregationRoute");
from("direct:aggregationRoute")
.aggregate([your correlation expression]), myAggregationStrategy)
...
另一种替代方法是使用AggregateController通过调用其方法forceCompletionOfAllGroups()来结束所有组的聚集:
AggregateController aggregateController = new DefaultAggregateController();
from(...)
...
.split(body()).streaming()
.aggregate([correlation expression], aggregationStrategy).aggregateController(aggregateController)
...
// Do what you need to do with the aggregated exchange
...
.end()
.end()
.bean(aggregateController, "forceCompletionOfAllGroups")
我已经登录了票,看看是否可以通过OOTB使事情变得更加简单:https://issues.apache.org/jira/browse/CAMEL-10474 –
您现在可以通过CAMEL-10474来做到这一点。但是我已经记录了另一张票,以使这更简单:https://issues.apache.org/jira/browse/CAMEL-12296 –