1

在集成流程中,使用其默认策略的拆分将从列表中发布项目。该项目的处理可能会失败。我想处理这个错误,并将一个新的消息与前一个映射信息(除了一个自定义错误头部)一起发送到正常的消息传递通道。如何在Spring Integration Java DSL中自定义消息聚合逻辑

在聚合器中,我想定制聚合逻辑来生成其他类型的消息,包括失败进程的计数和未失败消息的结果。

在这里,我解释我是如何发送错误消息的标题:

@Bean 
public IntegrationFlow socialMediaErrorFlow() { 
    return IntegrationFlows.from("socialMediaErrorChannel") 
      .wireTap(sf -> sf.handle("errorService", "handleException")) 
      .<MessagingException>handle((p, h) 
       -> MessageBuilder.withPayload(Collections.<CommentEntity>emptyList()) 
        .copyHeaders(p.getFailedMessage().getHeaders()) 
        .setHeader("ERROR", true) 
        .build() 
      ) 
      .channel("directChannel_1") 
      .get(); 
} 

我想要的聚合产生这种类型的对象:

public class Result { 

    private Integer totalTask; 
    private Integer taskFailed; 
    private List<CommentEntity> comments; 

} 

我应该如何处理这个?

在此先感谢。

由于阿尔乔姆的帮助下,我做这个实现:

.aggregate(a -> a.outputProcessor(new MessageGroupProcessor() { 
     @Override 
     public Object processMessageGroup(MessageGroup mg) { 
      Integer failedTaskCount = 0; 
      Integer totalTaskCount = mg.getMessages().size(); 
      List<CommentEntity> comments = new ArrayList<>(); 
      for(Message<?> message: mg.getMessages()){ 
       if(message.getHeaders().containsKey("ERROR")) 
        failedTaskCount++; 
       else 
          comments.addAll((List<CommentEntity>)message.getPayload()); 
     } 

    return new IterationResult(totalTaskCount, failedTaskCount, comments); 

    } 
})) 

回答

1

AggregatorSpecoutputProcessor属性:

/** 
* A processor to determine the output message from the released group. Defaults to a message 
* with a payload that is a collection of payloads from the input messages. 
* @param outputProcessor the processor. 
* @return the aggregator spec. 
*/ 
public AggregatorSpec outputProcessor(MessageGroupProcessor outputProcessor) { 

在这里,您可以提供自己的自定义逻辑来解析在所有消息为他们建立你的Result

从测试情况下的示例:

.aggregate(a -> a.outputProcessor(g -> g.getMessages() 
         .stream() 
         .map(m -> (String) m.getPayload()) 
         .collect(Collectors.joining(" ")))) 

Cafe Demo样品:

.aggregate(aggregator -> aggregator 
     .outputProcessor(g -> 
        new Delivery(g.getMessages() 
           .stream() 
           .map(message -> (Drink) message.getPayload()) 
           .collect(Collectors.toList()))) 
     .correlationStrategy(m -> ((Drink) m.getPayload()).getOrderNumber())) 
相关问题