2016-12-04 163 views
2

我有一个Spring集成DSL流,它从一个rest API中提取数据,将其转换并发送到不同的rest API。弹簧集成队列错误处理

获取数据后,它会将消息发送到队列通道,进行其余处理。当队列正在工作时,原始线程将进入并获取更多数据。

我遇到的问题是,从队列中抛出的任何错误在处理完所有数据之后才会处理,但是我希望它立即停止处理并立即抛出错误,因为整个过程可能会花费很长一段时间,但我希望它停止发现第一个错误。

网关:

@MessagingGateway(errorChannel = "syncErrorChannel") 
@Service 
public interface CrmGateway { 
    @Gateway(requestChannel = "departmentSyncInput", replyChannel = "departmentSyncOutput") 
    @Payload("new String()") 
    Object syncDepartments(); 
} 

流量:

/** 
    * Fetches data from the source api and passes it on to the split channel to process it If the 
    * response indicates it has more data to fetch then it is also loaded 
    * 
    * @return {@link IntegrationFlow} 
    */ 
    @Bean 
    IntegrationFlow sync() { 
    return IntegrationFlows 
     .from("departmentSyncInput") 
     .handle(this::fetchDomain) 
     .enrichHeaders(s -> s.headerExpressions(h -> h.put("nextLink", "payload.getNext()"))) 
     .routeToRecipients(r -> r 
     .recipient("departmentSplitChannel") 
     .recipient(
      "departmentSyncInput", 
      p -> p.getPayload() instanceof Wrapper 
      && ((Wrapper) p.getPayload()).getNext() != null 
     )) 
     .get(); 
    } 

    /** 
    * Split data from the api into individual models and send them to the target service 
    * 
    * @return {@link IntegrationFlow} 
    */ 
    @Bean 
    IntegrationFlow split() { 
    return IntegrationFlows 
     .from("departmentSplitChannel") 
     .transform(Wrapper.class, Wrapper::getContent) 
     .split() 
     .channel(c -> c.executor(Executors.newScheduledThreadPool(100))) 
     .enrichHeaders(h -> h.header("errorChannel", "syncErrorChannel")) 
     .handle((payload, headers) -> log("Syncing", payload, payload)) 
     .transform(Department.class, transformer) 
     // exception happens here 
     .handle(DepartmentDTO.class, (payload, headers) -> service.upsertDepartment(payload)) 
     .handle((payload, headers) -> log("Synced", payload, payload)) 
     .aggregate() 
     .get(); 
    } 

错误处理程序:

@Bean 
    IntegrationFlow errorHandler() { 
    return IntegrationFlows 
     .from("syncErrorChannel") 
     .handle(Exception.class, (payload, headers) -> { 
     payload.printStackTrace(); 
     return payload; 
     }) 
     .get(); 
    } 

我也使用IntegrationFlows.from("errorChannel")具有相同的结果尝试。

我也尝试过使用Future,它的行为相同,因此当我打电话给get()时,出现错误,但这仍然在最后发生。

感谢您的任何帮助。

回答

0

您的流程中没有queue频道定义,但我想你的意思是.channel(c -> c.executor())。如果你在这个问题上分享日志,情况会更好。

我可以说你试图覆盖errorChannel标题,在Gateway的情况下是TemporaryReplyChannel

因此,错误发送到网关的进程并在split的情况下崩溃。

我建议你用h.header("errorChannel", "syncErrorChannel", true)来尝试覆盖该标题。

+0

谢谢,这似乎已经做到了。使用'c - > c.executor()'与队列不同?这是我从XML转换而来的DSL,但我以前没有使用SI DSL。 –

+0

好吧,从高处来看,它确实是一个队列,因为如果没有空闲线程来处理它,它会为执行程序提供一个任务,将它们存储在内部队列中。我在谈论的是QueueChannel针对投票特定的行为 –