2016-10-28 57 views
0

我想解压一些文件(其本身包含zipfiles)从谷歌存储谷歌存储。谷歌数据流:编码为ZipInputStream

因此,我有以下DOFN收集ZipInputStreams:

static class UnzipFilesFN extends DoFn<GcsPath,ZipInputStream>{ 

private static final long serialVersionUID = 7373250969860890761L; 
public void processElement(ProcessContext c){ 
    GcsPath p = c.element(); 
    try{ 
     ZipInputStream zis = new ZipInputStream(new FileInputStream(p.toString())); 
     c.output(zis); 

    } 
    catch (FileNotFoundException fnfe){ 
     // 
    } 
    } 

}

而下面的自定义接收做解压和写作部分:

public static class ZipIO{  
    public static class Sink extends com.google.cloud.dataflow.sdk.io.Sink<ZipInputStream> { 

    private static final long serialVersionUID = -7414200726778377175L; 
    final String unzipTarget; 

     public Sink withDestinationPath(String s){ 
     if(s!=""){ 
      return new Sink(s); 
     } 
     else { 
      throw new IllegalArgumentException("must assign destination path"); 
     } 

     } 

     protected Sink(String path){ 
      this.unzipTarget = path; 
     } 

     @Override 
     public void validate(PipelineOptions po){ 
      if(unzipTarget==null){ 
       throw new RuntimeException(); 
      } 
     } 

     @Override 
     public ZipFileWriteOperation createWriteOperation(PipelineOptions po){ 
      return new ZipFileWriteOperation(this); 
     } 

    } 

    private static class ZipFileWriteOperation extends WriteOperation<ZipInputStream, UnzipResult>{ 

    private static final long serialVersionUID = 7976541367499831605L; 
    private final ZipIO.Sink sink; 

     public ZipFileWriteOperation(ZipIO.Sink sink){ 
      this.sink = sink; 
     } 



     @Override 
     public void initialize(PipelineOptions po) throws Exception{ 

     } 

     @Override 
     public void finalize(Iterable<UnzipResult> writerResults, PipelineOptions po) throws Exception { 
     long totalFiles = 0; 
     for(UnzipResult r:writerResults){ 
      totalFiles +=r.filesUnziped; 
     } 
     LOG.info("Unzipped {} Files",totalFiles); 
     } 

     @Override 
     public ZipIO.Sink getSink(){ 
      return sink; 
     } 

     @Override 
     public ZipWriter createWriter(PipelineOptions po) throws Exception{ 
      return new ZipWriter(this); 
     } 

    } 

    private static class ZipWriter extends Writer<ZipInputStream, UnzipResult>{ 
     private final ZipFileWriteOperation writeOp; 
     private long totalUnzipped = 0; 

     ZipWriter(ZipFileWriteOperation writeOp){ 
      this.writeOp = writeOp; 
     } 

     @Override 
     public void open(String uID) throws Exception{ 
     } 

     @Override 
     public void write(ZipInputStream zis){ 
      byte[] buffer = new byte[1024]; 
      try{ 
       ZipEntry ze = zis.getNextEntry(); 
       while(ze!=null){ 
        File f = new File(writeOp.sink.unzipTarget + "/" + ze.getName()); 
        FileOutputStream fos = new FileOutputStream(f); 
        int len; 
        while((len=zis.read(buffer))>0){ 
         fos.write(buffer, 0, len); 
        } 
        fos.close(); 
        this.totalUnzipped++; 
       } 
       zis.closeEntry(); 
       zis.close(); 
      } 
      catch(Exception e){ 
       // 
      } 

     } 

     @Override 
     public UnzipResult close() throws Exception{ 
      return new UnzipResult(this.totalUnzipped); 
     } 

     @Override 
     public ZipFileWriteOperation getWriteOperation(){ 
      return writeOp; 
     } 


    } 

    private static class UnzipResult implements Serializable{ 
    private static final long serialVersionUID = -8504626439217544799L; 
    final long filesUnziped;  
     public UnzipResult(long filesUnziped){ 
      this.filesUnziped=filesUnziped; 
     } 
    } 
} 

}

当我尝试运行管道时,出现一些错误:

从后备CoderProvider未能建立一个编码器:无法为类型java.util.zip.ZipInputStream中提供编码器:[email protected]37无法提供类型为java.util.zip.ZipInputStream的编码器:无法提供ProtoCoder,因为java.util.zip.ZipInputStream不是com.google.protobuf.Message的子类; [email protected]无法提供类型为java.util.zip.ZipInputStream的编码器:无法提供SerializableCoder,因为java.util.zip.ZipInputStream没有实现Serializable。 at com.google.cloud.dataflow.sdk.values.TypedPValue.inferCoderOrFail(TypedPValue.java:195) at com.google.cloud.dataflow.sdk.values.TypedPValue.getCoder(TypedPValue.java:48) at com.google.cloud.dataflow.sdk.values.PCollection.getCoder(PCollection.java:137) 在com.google.cloud.dataflow.sdk.values.TypedPValue.finishSpecifying(TypedPValue.java:88) 在COM。 google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:332) 在com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:291) 在com.google.cloud.dataflow。 sdk.values.PCollection.apply(PCollection.java:174)

哪个代码我是否需要分配来处理ZipInputStreams?

感谢& BR 菲利普

回答

0

打码机是必要的,这样一个亚军可以兑现的PCollection临时存储和读回,而不是在内存中保存它。我想不出一个合理的方法来实现一个ZipInputStream对象 - 这是一个基本的概念问题,而不是一个Coder API问题。

然而,在特定情况下,我认为你可以简单地在你ZipWriter.write()功能打开ZipInputStream,使ZipIO.SinkSink<GcsPath>而非Sink<ZipInputStream>

我在代码中注意到了另外一件事情:我想你打算将这些代码用于位于GCS和Cloud Data Runner上的文件,而不仅仅是内存中的runner和本地文件。在这种情况下,java.io.File不会透明地处理对GCS的读取/写入 - 您需要使用GcsUtil

+0

嗨,我使用GcsPath代替ZipInputStream时得到相同的编码器错误。谢谢&BR Philipp – bigdataclown

+0

哦。我的道歉,GcsPath没有标记为Serializable(尽管我认为它应该是)。看起来你需要用'String'来表示路径。 – jkff