这是我早期问题Spring Integration File reading的延续。 总之,我有一个fileIn通道(一个队列),然后是一个处理文件的ServiceActivator和一个用于保存文件的出站适配器。ServiceActivators的Spring Integration并发性
我想引入并发性来处理多线程中的消息。我使用java DSL(但不是Java8)。我能够用下面的方式做到这一点...
@Bean
public MessageChannel fileInChannel() {
return MessageChannels.queue("fileIn").get();
}
@Bean
public IntegrationFlow fileProcessingFlow() {
return IntegrationFlows.from(fileInChannel())
.handle(myFileProcessor, "processFile",
new Consumer<GenericEndpointSpec<ServiceActivatingHandler>>() {
@Override
public void accept(GenericEndpointSpec<ServiceActivatingHandler> t) {
t.poller(Pollers.fixedRate(100).maxMessagesPerPoll(1).taskExecutor(Executors.newCachedThreadPool()));
}
})
.handle(Files.outboundAdapter(new File(outDir)).autoCreateDirectory(true).get())
.get();
}
这工作!我也试过以下
public IntegrationFlow fileProcessingFlow() {
return IntegrationFlows.from(fileInChannel())
.channel(MessageChannels.executor(Executors.newCachedThreadPool()))
.handle(myFileProcessor)
.handle(Files.outboundAdapter(new File(outDir)).autoCreateDirectory(true).get())
.get();
}
这也行!我不知道它只是一种风格,或者一种方法比另一种更好。如果是这样,哪种方法更好。其次,在上述情况下,“文件写入”(即最后一步)是顺序的还是将在不同的线程中工作。如果我还需要并发性,我应该在句柄(fileProcessor)和句柄(outBoundAdapter)之间引入另一个taskExecutor通道吗? 最终outboundadapter将是一个远程文件S3适配器,因此问题