我有一个Spring集成DSL流,它从一个rest API中提取数据,将其转换并发送到不同的rest API。弹簧集成队列错误处理
获取数据后,它会将消息发送到队列通道,进行其余处理。当队列正在工作时,原始线程将进入并获取更多数据。
我遇到的问题是,从队列中抛出的任何错误在处理完所有数据之后才会处理,但是我希望它立即停止处理并立即抛出错误,因为整个过程可能会花费很长一段时间,但我希望它停止发现第一个错误。
网关:
@MessagingGateway(errorChannel = "syncErrorChannel")
@Service
public interface CrmGateway {
@Gateway(requestChannel = "departmentSyncInput", replyChannel = "departmentSyncOutput")
@Payload("new String()")
Object syncDepartments();
}
流量:
/**
* Fetches data from the source api and passes it on to the split channel to process it If the
* response indicates it has more data to fetch then it is also loaded
*
* @return {@link IntegrationFlow}
*/
@Bean
IntegrationFlow sync() {
return IntegrationFlows
.from("departmentSyncInput")
.handle(this::fetchDomain)
.enrichHeaders(s -> s.headerExpressions(h -> h.put("nextLink", "payload.getNext()")))
.routeToRecipients(r -> r
.recipient("departmentSplitChannel")
.recipient(
"departmentSyncInput",
p -> p.getPayload() instanceof Wrapper
&& ((Wrapper) p.getPayload()).getNext() != null
))
.get();
}
/**
* Split data from the api into individual models and send them to the target service
*
* @return {@link IntegrationFlow}
*/
@Bean
IntegrationFlow split() {
return IntegrationFlows
.from("departmentSplitChannel")
.transform(Wrapper.class, Wrapper::getContent)
.split()
.channel(c -> c.executor(Executors.newScheduledThreadPool(100)))
.enrichHeaders(h -> h.header("errorChannel", "syncErrorChannel"))
.handle((payload, headers) -> log("Syncing", payload, payload))
.transform(Department.class, transformer)
// exception happens here
.handle(DepartmentDTO.class, (payload, headers) -> service.upsertDepartment(payload))
.handle((payload, headers) -> log("Synced", payload, payload))
.aggregate()
.get();
}
错误处理程序:
@Bean
IntegrationFlow errorHandler() {
return IntegrationFlows
.from("syncErrorChannel")
.handle(Exception.class, (payload, headers) -> {
payload.printStackTrace();
return payload;
})
.get();
}
我也使用IntegrationFlows.from("errorChannel")
具有相同的结果尝试。
我也尝试过使用Future
,它的行为相同,因此当我打电话给get()
时,出现错误,但这仍然在最后发生。
感谢您的任何帮助。
谢谢,这似乎已经做到了。使用'c - > c.executor()'与队列不同?这是我从XML转换而来的DSL,但我以前没有使用SI DSL。 –
好吧,从高处来看,它确实是一个队列,因为如果没有空闲线程来处理它,它会为执行程序提供一个任务,将它们存储在内部队列中。我在谈论的是QueueChannel针对投票特定的行为 –