2016-11-11 79 views
0

我使用的是Apache Camel并获取一个大文件以进行输入,我必须逐行处理。内容已经排序,我必须使用相同的关联关键字聚合所有连续的行。如果相关键发生变化,则必须完成前一个聚合。如果文件结束,最后的聚合也已完成。 我有一些限制: - 由于传入文件相当大,我们希望以流式处理它。 - 因为结果赋予同步端点,所以我不想使用超时完成谓词。否则,我将失去调节数据源消耗速度的背压,并且交换将积累在AggregateProcessor的超时映射和聚合存储库中。只汇总具有相同关联密钥的连续交换

PreCompletionAwareAggregationStrategy看起来像一个有希望的解决方案,但事实证明,最后一个聚合将不会完成,直到下一个文件到达。如果我在preComplete中使用CamelSplitComplete属性,则最后一个聚合完成,但没有最后一个传入交换。相反,这最后的交换将被添加到下一个文件到达的内容。

因此,目前我很迷茫,找不到过分难看的解决方案。

+0

我已经登录了票,看看是否可以通过OOTB使事情变得更加简单:https://issues.apache.org/jira/browse/CAMEL-10474 –

+0

您现在可以通过CAMEL-10474来做到这一点。但是我已经记录了另一张票,以使这更简单:https://issues.apache.org/jira/browse/CAMEL-12296 –

回答

0

在描述的场景,我会派劈裂消息的路由与聚合器(姑且称之为“AggregationRoute”),该公司聚集战略实现PreCompletionAwareAggregationStrategy(你已经在使用它的方式,我猜)。 然后,当分割结束时,将AGGREGATION_COMPLETE_ALL_GROUPS标头设置为true并将其发送到AggregationRoute。此交换仅用作完成所有聚合组的信号。

实施例:


    ... 
    .split(body()).streaming() 
     .to("direct:aggregationRoute") 
    .end() 
    .setHeader(Exchange.AGGREGATION_COMPLETE_ALL_GROUPS,constant(true)) 
    .to("direct:aggregationRoute"); 

from("direct:aggregationRoute") 
    .aggregate([your correlation expression]), myAggregationStrategy) 
    ... 

另一种替代方法是使用AggregateController通过调用其方法forceCompletionOfAllGroups()来结束所有组的聚集:


AggregateController aggregateController = new DefaultAggregateController(); 

from(...) 
    ... 
    .split(body()).streaming() 
     .aggregate([correlation expression], aggregationStrategy).aggregateController(aggregateController) 
      ... 
      // Do what you need to do with the aggregated exchange 
      ... 
     .end() 
    .end() 
    .bean(aggregateController, "forceCompletionOfAllGroups") 
相关问题