2016-11-23 110 views
1

我是Spring集成的新手。我正在研究解决方案,但在使用入站文件适配器(FileReadingMessageSource)时,我遇到了特定问题。 我必须从不同目录读取文件并处理它们,并将这些文件保存在不同的目录中。据我所知,目录名称在流程开始时是固定的。 有人可以帮助我更改不同请求的目录名称。春季集成文件阅读

我尝试了以下方法。首先,我不确定这是否是正确的方法,尽管它只适用于一个目录。我认为Poller正在等待更多的文件,并且再也没有回来阅读另一个目录。

@SpringBootApplication 
@EnableIntegration 
@IntegrationComponentScan 
public class SiSampleFileProcessor { 

    @Autowired 
    MyFileProcessor myFileProcessor; 

    @Value("${si.outdir}") 
    String outDir; 

    @Autowired 
    Environment env; 

    public static void main(String[] args) throws IOException { 
     ConfigurableApplicationContext ctx = new SpringApplication(SiSampleFileProcessor.class).run(args); 
     FileProcessingService gateway = ctx.getBean(FileProcessingService.class); 
     boolean process = true; 
     while (process) { 
      System.out.println("Please enter the input Directory: "); 
      String inDir = new Scanner(System.in).nextLine(); 
      if (inDir.isEmpty() || inDir.equals("exit")) { 
       process=false; 
      } else { 
       System.out.println("Processing... " + inDir); 
       gateway.processFilesin(inDir); 
      } 
     } 
     ctx.close(); 
    } 

    @MessagingGateway(defaultRequestChannel="requestChannel") 
    public interface FileProcessingService { 
     String processFilesin(String inputDir); 
    } 

    @Bean(name = PollerMetadata.DEFAULT_POLLER) 
    public PollerMetadata poller() {          
    return Pollers.fixedDelay(1000).get(); 
    } 

    @Bean 
    public MessageChannel requestChannel() { 
     return new DirectChannel(); 
    } 

    @ServiceActivator(inputChannel = "requestChannel") 
    @Bean 
    GenericHandler<String> fileReader() { 
     return new GenericHandler<String>() { 
      @Override 
      public Object handle(String p, Map<String, Object> map) { 
       FileReadingMessageSource fileSource = new FileReadingMessageSource(); 
       fileSource.setDirectory(new File(p)); 
       Message<File> msg; 
       while((msg = fileSource.receive()) != null) { 
        fileInChannel().send(msg); 
       } 
       return null; // Not sure what to return! 
      } 
     }; 
    } 

    @Bean 
    public MessageChannel fileInChannel() { 
     return MessageChannels.queue("fileIn").get(); 
    } 

    @Bean 
    public IntegrationFlow fileProcessingFlow() { 
     return IntegrationFlows.from(fileInChannel()) 
       .handle(myFileProcessor) 
       .handle(Files.outboundAdapter(new File(outDir)).autoCreateDirectory(true).get()) 
       .get(); 
    }  
} 

编辑:根据Gary的响应更换了一些方法,如

@MessagingGateway(defaultRequestChannel="requestChannel") 
public interface FileProcessingService { 
    boolean processFilesin(String inputDir); 
} 

@ServiceActivator(inputChannel = "requestChannel") 
public boolean fileReader(String inDir) { 
    FileReadingMessageSource fileSource = new FileReadingMessageSource(); 
    fileSource.setDirectory(new File(inDir)); 
    fileSource.afterPropertiesSet(); 
    fileSource.start(); 
    Message<File> msg; 
    while ((msg = fileSource.receive()) != null) { 
     fileInChannel().send(msg); 
    } 
    fileSource.stop(); 
    System.out.println("Sent all files in directory: " + inDir); 
    return true; 
} 

现在按预期工作。

回答

0

FileReadingMessageSource在内部使用DirectoryScanner;它通常在注入属性后由Spring设置。由于您在Spring之外管理对象,因此需要调用Spring bean初始化和生命周期方法afterPropertiesSet()start()stop()。当接收返回null时,调用stop()

> return null; // Not sure what to return! 

如果您不返回任何内容,您的调用线程将挂起在等待响应的网关中。您可以将网关更改为void,或者由于您的网关期望有一个字符串,只需返回一些值即可。

但是,您的调用代码无论如何不看结果。

> gateway.processFilesin(inDir); 

此外,删除从@ServiceActivator@Bean;用这种风格,豆类必须是MessageHandler

+0

谢谢你,作为工作预期。我已经用你的建议编辑了这篇文章。但需要澄清。对于类似的需求(即阅读不同的目录),如果源代码是S3,我们该怎么办? – pkm

+0

@pkm mount s3作为一个文件系统并监听它的变化,或者你可以使用带有触发器的aws微服务流在s3上放置/发布,并且你对它采取适当的行动 –

+0

[spring-integration-aws](https:// github.com/spring-projects/spring-integration-aws)项目提供了一个消息源,但它更复杂一点,它将远程目录与本地同步,然后对本地目录使用“FileReadingMessageSource”。您应该使用'S3RemoteFileTemplate'代替。 –

0

您可以使用此代码

FileProcessor.java

import org.springframework.messaging.Message; 
import org.springframework.stereotype.Component; 
@Component 
public class FileProcessor { 

    private static final String HEADER_FILE_NAME = "file_name"; 
    private static final String MSG = "%s received. Content: %s"; 

    public void process(Message<String> msg) { 
     String fileName = (String) msg.getHeaders().get(HEADER_FILE_NAME); 
     String content = msg.getPayload(); 
     //System.out.println(String.format(MSG, fileName, content)); 
     System.out.println(content); 

    } 
} 

LastModifiedFileFilter.java

package com.example.demo; 

import org.springframework.integration.file.filters.AbstractFileListFilter; 

import java.io.File; 
import java.util.HashMap; 
import java.util.Map; 

public class LastModifiedFileFilter extends AbstractFileListFilter<File> { 
    private final Map<String, Long> files = new HashMap<>(); 
    private final Object monitor = new Object(); 

    @Override 
    protected boolean accept(File file) { 
     synchronized (this.monitor) { 
      Long previousModifiedTime = files.put(file.getName(), file.lastModified()); 

      return previousModifiedTime == null || previousModifiedTime != file.lastModified(); 
     } 
    } 
} 

主类= DemoApplication.java

package com.example.demo; 

import org.springframework.boot.SpringApplication; 
import org.springframework.boot.autoconfigure.EnableAutoConfiguration; 
import org.springframework.boot.autoconfigure.SpringBootApplication; 
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration; 
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration; 
import java.io.File; 
import java.io.IOException; 
import java.nio.charset.Charset; 
import org.apache.commons.io.FileUtils; 
import org.springframework.context.annotation.Bean; 
import org.springframework.context.annotation.Configuration; 
import org.springframework.integration.annotation.Aggregator; 
import org.springframework.integration.annotation.InboundChannelAdapter; 
import org.springframework.integration.annotation.Poller; 
import org.springframework.integration.channel.DirectChannel; 
import org.springframework.integration.channel.QueueChannel; 
import org.springframework.integration.core.MessageSource; 
import org.springframework.integration.dsl.IntegrationFlow; 
import org.springframework.integration.dsl.IntegrationFlows; 
import org.springframework.integration.dsl.channel.MessageChannels; 
import org.springframework.integration.dsl.core.Pollers; 
import org.springframework.integration.file.FileReadingMessageSource; 
import org.springframework.integration.file.filters.CompositeFileListFilter; 
import org.springframework.integration.file.filters.SimplePatternFileListFilter; 
import org.springframework.integration.file.transformer.FileToStringTransformer; 
import org.springframework.integration.scheduling.PollerMetadata; 
import org.springframework.messaging.MessageChannel; 
import org.springframework.messaging.PollableChannel; 
import org.springframework.stereotype.Component; 


@SpringBootApplication 
@Configuration 
public class DemoApplication { 

    private static final String DIRECTORY = "E:/usmandata/logs/input/"; 

    public static void main(String[] args) throws IOException, InterruptedException { 
     SpringApplication.run(DemoApplication.class, args); 


    } 


    @Bean 
    public IntegrationFlow processFileFlow() { 
     return IntegrationFlows 
       .from("fileInputChannel") 
       .transform(fileToStringTransformer()) 
       .handle("fileProcessor", "process").get(); 
    } 

    @Bean 
    public MessageChannel fileInputChannel() { 
     return new DirectChannel(); 
    } 

    @Bean 
    @InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "1000")) 
    public MessageSource<File> fileReadingMessageSource() { 
     CompositeFileListFilter<File> filters =new CompositeFileListFilter<>(); 
     filters.addFilter(new SimplePatternFileListFilter("*.log")); 
     filters.addFilter(new LastModifiedFileFilter()); 

     FileReadingMessageSource source = new FileReadingMessageSource(); 
     source.setAutoCreateDirectory(true); 
     source.setDirectory(new File(DIRECTORY)); 
     source.setFilter(filters); 

     return source; 
    } 

    @Bean 
    public FileToStringTransformer fileToStringTransformer() { 
     return new FileToStringTransformer(); 
    } 

    @Bean 
    public FileProcessor fileProcessor() { 
     return new FileProcessor(); 
    } 
}