2016-11-25 132 views
1

这是我早期问题Spring Integration File reading的延续。 总之,我有一个fileIn通道(一个队列),然后是一个处理文件的ServiceActivator和一个用于保存文件的出站适配器。ServiceActivators的Spring Integration并发性

我想引入并发性来处理多线程中的消息。我使用java DSL(但不是Java8)。我能够用下面的方式做到这一点...

@Bean 
public MessageChannel fileInChannel() { 
    return MessageChannels.queue("fileIn").get(); 
} 

@Bean 
public IntegrationFlow fileProcessingFlow() { 
    return IntegrationFlows.from(fileInChannel()) 
      .handle(myFileProcessor, "processFile", 
        new Consumer<GenericEndpointSpec<ServiceActivatingHandler>>() { 
         @Override 
         public void accept(GenericEndpointSpec<ServiceActivatingHandler> t) { 
          t.poller(Pollers.fixedRate(100).maxMessagesPerPoll(1).taskExecutor(Executors.newCachedThreadPool())); 
         } 
        }) 
      .handle(Files.outboundAdapter(new File(outDir)).autoCreateDirectory(true).get()) 
      .get(); 
} 

这工作!我也试过以下

public IntegrationFlow fileProcessingFlow() { 
    return IntegrationFlows.from(fileInChannel()) 
      .channel(MessageChannels.executor(Executors.newCachedThreadPool())) 
      .handle(myFileProcessor) 
      .handle(Files.outboundAdapter(new File(outDir)).autoCreateDirectory(true).get()) 
      .get(); 
} 

这也行!我不知道它只是一种风格,或者一种方法比另一种更好。如果是这样,哪种方法更好。其次,在上述情况下,“文件写入”(即最后一步)是顺序的还是将在不同的线程中工作。如果我还需要并发性,我应该在句柄(fileProcessor)和句柄(outBoundAdapter)之间引入另一个taskExecutor通道吗? 最终outboundadapter将是一个远程文件S3适配器,因此问题

回答

0

这只是风格,虽然我倾向于喜欢第二个。文件适配器是线程安全的。

通常,写操作将并行进行。

唯一的例外是,如果您使用FileExistsMode.APPEND进行编写,在这种情况下会在写入期间保持锁定,并且如果文件名称与同一个锁(有256个锁)一起散列为另一个文件写的话,它会在第一次完成时运行。

相关问题