2017-04-17 85 views
0

我使用spring集成aws来轮询S3资源并从S3存储桶获取文件并使用spring集成来处理它们。 下面是我有:实现spring-integration-aws的问题

AmazonS3 amazonS3 = new AmazonS3Client(new BasicAWSCredentials(accessKey, secretKey)); 

@Bean 
IntegrationFlow fileReadingFlow() { 
    return IntegrationFlows 
       .from(s3InboundFileSynchronizingMessageSource(), 
         e -> e.poller(p -> p.fixedDelay(30, TimeUnit.SECONDS))) 
      .handle(receiptProcessor()) 
      .get(); 
} 
@Bean 
public S3InboundFileSynchronizer s3InboundFileSynchronizer() { 
    S3InboundFileSynchronizer synchronizer = new S3InboundFileSynchronizer(amazonS3); 
    synchronizer.setDeleteRemoteFiles(false); 
    synchronizer.setPreserveTimestamp(true); 
    synchronizer.setRemoteDirectory(s3BucketName.concat("/").concat(s3InboundFolder)); 
    synchronizer.setFilter(new S3RegexPatternFileListFilter(".*\\.dat\\.{0,1}\\d{0,2}")); 
    return synchronizer; 
} 

@Bean 
public S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource() { 
    S3InboundFileSynchronizingMessageSource messageSource = 
      new S3InboundFileSynchronizingMessageSource(s3InboundFileSynchronizer()); 
    messageSource.setAutoCreateLocalDirectory(false); 
    messageSource.setLocalDirectory(new File(inboundDir)); 
    messageSource.setLocalFilter(new AcceptOnceFileListFilter<File>()); 
    return messageSource; 
} 

,我的S3存储和重点是:

bucketName = shipmentReceipts 
key = receipts/originalReceipts/inbound/receipt1.dat 

,所以我面临的2个问题与此实现:
1. inboundDir文件夹名称是使用s3key将其重命名为不同的路径名称,从而导致FileNotFoundException

protected void copyFileToLocalDirectory(String remoteDirectoryPath, F remoteFile, File localDirectory, 
     Session<F> session) throws IOException { 
    String remoteFileName = this.getFilename(remoteFile); 
    String localFileName = **this.generateLocalFileName(remoteFileName);** 
    String remoteFilePath = remoteDirectoryPath != null 
      ? (remoteDirectoryPath + this.remoteFileSeparator + remoteFileName) 
      : remoteFileName; 
    if (!this.isFile(remoteFile)) { 
     if (this.logger.isDebugEnabled()) { 
      this.logger.debug("cannot copy, not a file: " + remoteFilePath); 
     } 
     return; 
    } 

    **File localFile = new File(localDirectory, localFileName);** 
    if (!localFile.exists()) {........ 

所以它结束了寻找一个文件路径C:\ SpringAws \ S3inbound \收据\ originalReceipts \入境\ receipt1.dat其中它不找到,并给出我AbstractInboundFileSynchronizer.java文件追踪这对下面的代码那FileNotFoundException错误。相反,它应该仅仅被复制到本地目录C:\ SpringAws \ S3inbound \ receipt1.dat

  • 同时拉动S3对象我注意到它被拉动所有对象shipmentReceipts/receipts下代替shipmentReceipts/receipts/originalReceipts/inbound 在进一步调试我发现,在S3Session.java下面的代码片段是负责IT:

    @Override 
    public S3ObjectSummary[] list(String path) throws IOException { 
    Assert.hasText(path, "'path' must not be empty String."); 
    String[] bucketPrefix = path.split("/"); 
    Assert.state(bucketPrefix.length > 0 && bucketPrefix[0].length() >= 3, 
         "S3 bucket name must be at least 3 characters long."); 
    
    String bucket = resolveBucket(bucketPrefix[0]); 
    
    ListObjectsRequest listObjectsRequest = new ListObjectsRequest() 
         .withBucketName(bucket); 
    if (bucketPrefix.length > 1) { 
        **listObjectsRequest.setPrefix(bucketPrefix[1]);** 
    } 
    
    /* 
    For listing objects, Amazon S3 returns up to 1,000 keys in the response. 
    If you have more than 1,000 keys in your bucket, the response will be truncated. 
    You should always check for if the response is truncated. 
    */ 
    ObjectListing objectListing; 
    List<S3ObjectSummary> objectSummaries = new ArrayList<>(); 
    do {...... 
    
  • 它集前缀一切后,第斜线/遇到。 我如何减轻这些?谢谢!

    回答

    0

    嵌套路径的首要问题是一个已知问题,并已在RecursiveDirectoryScanner中修复了最新的5.0 M3https://spring.io/blog/2017/04/05/spring-integration-5-0-milestone-3-available

    同时你必须指定LocalFilenameGeneratorExpression为:

    Expression expression = PARSER.parseExpression("#this.contains('/') ? #this.substring(#this.lastIndexOf('/') + 1) : #this"); 
    synchronizer.setLocalFilenameGeneratorExpression(expression); 
    

    S3ObjectSummary包含key作为不带bucket完整路径。

    第二个“嵌套路径”问题已通过:https://github.com/spring-projects/spring-integration-aws/issues/45修复。此修复程序在1.1.0.M1可用:https://spring.io/blog/2017/03/09/spring-integration-extension-for-aws-1-1-0-m1-available

    +0

    谢谢阿尔乔姆!我确实使用了spring-integration-aws的1.1.0.M1版本,但仍然编写了自己的类来解决上述问题。 – user5758361

    +0

    我正在使用Spring集成5.0.0.M4和Spring集成AWS 1.1.0.M2,并且在使用像'abc/def /'这样的存储桶名称时仍然存在相同的问题。有关解决方法,请参阅下面的答案。我是流媒体,所以没有可以操纵的本地文件名。 –

    +0

    您是否介意提出GH问题,并提供更多详细信息以从我们这边复制?谢谢 –

    0

    按阿尔乔姆,我没有使用弹簧集成-AWS的最新里程碑版本,但发现它更容易编写扩展AbstractInboundFileSynchronizer解决我的问题的自定义类。 继承人的类我创建:

    public class MyAbstractInboundFileSynchronizer extends AbstractInboundFileSynchronizer<S3ObjectSummary> { 
    
    private volatile String remoteFileSeparator = "/"; 
    private volatile String temporaryFileSuffix = ".writing"; 
    private volatile boolean deleteRemoteFiles; 
    private volatile boolean preserveTimestamp; 
    private volatile FileListFilter<S3ObjectSummary> filter; 
    private volatile Expression localFilenameGeneratorExpression; 
    private volatile EvaluationContext evaluationContext; 
    
    @Override 
    public void setLocalFilenameGeneratorExpression(Expression localFilenameGeneratorExpression) { 
        super.setLocalFilenameGeneratorExpression(localFilenameGeneratorExpression); 
        this.localFilenameGeneratorExpression = localFilenameGeneratorExpression; 
    } 
    
    @Override 
    public void setIntegrationEvaluationContext(EvaluationContext evaluationContext) { 
        super.setIntegrationEvaluationContext(evaluationContext); 
        this.evaluationContext = evaluationContext; 
    } 
    
    @Override 
    public void setRemoteFileSeparator(String remoteFileSeparator) { 
        super.setRemoteFileSeparator(remoteFileSeparator); 
        this.remoteFileSeparator = remoteFileSeparator; 
    } 
    
    public MyAbstractInboundFileSynchronizer() { 
        this(new S3SessionFactory()); 
    } 
    
    public MyAbstractInboundFileSynchronizer(AmazonS3 amazonS3) { 
        this(new S3SessionFactory(amazonS3)); 
    } 
    
    /** 
    * Create a synchronizer with the {@link SessionFactory} used to acquire {@link Session} instances. 
    * @param sessionFactory The session factory. 
    */ 
    public MyAbstractInboundFileSynchronizer(SessionFactory<S3ObjectSummary> sessionFactory) { 
        super(sessionFactory); 
        setRemoteDirectoryExpression(new LiteralExpression(null)); 
        setFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "s3MessageSource")); 
    } 
    
    @Override 
    public final void setRemoteDirectoryExpression(Expression remoteDirectoryExpression) { 
        super.setRemoteDirectoryExpression(remoteDirectoryExpression); 
    } 
    
    @Override 
    public final void setFilter(FileListFilter<S3ObjectSummary> filter) { 
        super.setFilter(filter); 
    } 
    
    @Override 
    protected boolean isFile(S3ObjectSummary file) { 
        return true; 
    } 
    
    @Override 
    protected String getFilename(S3ObjectSummary file) { 
        if(file != null){ 
         String key = file.getKey(); 
         String fileName = key.substring(key.lastIndexOf('/')+1); 
         return fileName; 
        } 
        else return null; 
    } 
    
    @Override 
    protected long getModified(S3ObjectSummary file) { 
        return file.getLastModified().getTime(); 
    } 
    
    @Override 
    protected void copyFileToLocalDirectory(String remoteDirectoryPath, S3ObjectSummary remoteFile, File localDirectory, 
                 Session<S3ObjectSummary> session) throws IOException { 
        String remoteFileName = this.getFilename(remoteFile); 
        //String localFileName = this.generateLocalFileName(remoteFileName); 
        String localFileName = remoteFileName; 
        String remoteFilePath = remoteDirectoryPath != null 
          ? (remoteDirectoryPath + remoteFileName) 
          : remoteFileName; 
        if (!this.isFile(remoteFile)) { 
         if (this.logger.isDebugEnabled()) { 
          this.logger.debug("cannot copy, not a file: " + remoteFilePath); 
         } 
         return; 
        } 
    
        File localFile = new File(localDirectory, localFileName); 
        if (!localFile.exists()) { 
         String tempFileName = localFile.getAbsolutePath() + this.temporaryFileSuffix; 
         File tempFile = new File(tempFileName); 
         OutputStream outputStream = new BufferedOutputStream(new FileOutputStream(tempFile)); 
         try { 
          session.read(remoteFilePath, outputStream); 
         } 
         catch (Exception e) { 
          if (e instanceof RuntimeException) { 
           throw (RuntimeException) e; 
          } 
          else { 
           throw new MessagingException("Failure occurred while copying from remote to local directory", e); 
          } 
         } 
         finally { 
          try { 
           outputStream.close(); 
          } 
          catch (Exception ignored2) { 
          } 
         } 
    
         if (tempFile.renameTo(localFile)) { 
          if (this.deleteRemoteFiles) { 
           session.remove(remoteFilePath); 
           if (this.logger.isDebugEnabled()) { 
            this.logger.debug("deleted " + remoteFilePath); 
           } 
          } 
         } 
         if (this.preserveTimestamp) { 
          localFile.setLastModified(getModified(remoteFile)); 
         } 
        } 
    } 
    } 
    

    我也更新了LocalFilenameGeneratorExpression按阿尔乔姆。谢谢!

    +0

    我使用Spring集成5.0.0.M4和Spring集成AWS 1.1.0.M2,并且在使用像abc/def /这样的存储桶名称时仍然存在相同的问题。请参阅我的答案以获得解决方法。 –

    0

    @ user5758361你嵌套路径描述的第一个问题也可以通过重写S3FileInfo解决:

    public class S3FileInfo extends org.springframework.integration.aws.support.S3FileInfo { 
        private static final ObjectWriter OBJECT_WRITER = new ObjectMapper().writerFor(S3ObjectSummary.class); 
    
        public S3FileInfo(S3ObjectSummary s3ObjectSummary) { 
         super(s3ObjectSummary); 
        } 
    
        @Override 
        public String getFilename() { 
         return FilenameUtils.getName(super.getFilename()); 
        } 
    
        @Override 
        public String toJson() { 
         try { 
          return OBJECT_WRITER.writeValueAsString(super.getFileInfo()); 
         } catch (JsonProcessingException e) { 
          throw new UncheckedIOException(e); 
         } 
        } 
    } 
    

    toJson是重写,以避免对某些对象NPE。

    使用它的流:

    public class S3StreamingMessageSource extends org.springframework.integration.aws.inbound.S3StreamingMessageSource { 
        public S3StreamingMessageSource(RemoteFileTemplate<S3ObjectSummary> template) { 
         super(template, null); 
        } 
    
        public S3StreamingMessageSource(RemoteFileTemplate<S3ObjectSummary> template, 
                Comparator<AbstractFileInfo<S3ObjectSummary>> comparator) { 
         super(template, comparator); 
        } 
    
        @Override 
        protected List<AbstractFileInfo<S3ObjectSummary>> asFileInfoList(Collection<S3ObjectSummary> collection) { 
         return collection.stream() 
           .map(S3FileInfo::new) 
           .collect(toList()); 
        } 
    } 
    

    顺便说一句,我使用Spring集成5.0.0.M4和Spring集成AWS 1.1.0.M2,并用水桶时,仍然有同样的问题像abc/def/