2017-09-05 71 views
0

我有一段代码可以提取Google云端存储中.ZIP文件的内容。它工作正常,但我需要使用此代码与将在运行时提供的文件路径(“gs://some_bucket/filename.zip”)。当我尝试使用运行值,我得到一个错误,如:使用ValueProvider作为Apache Beam中的路径提取zip内容

Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize [email protected] 
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53) 
    at org.apache.beam.sdk.util.SerializableUtils.ensureSerializable(SerializableUtils.java:83) 
    at org.apache.beam.sdk.io.Read$Bounded.<init>(Read.java:94) 
    at org.apache.beam.sdk.io.Read$Bounded.<init>(Read.java:89) 
    at org.apache.beam.sdk.io.Read.from(Read.java:48) 
    at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Read.expand(BigQueryIO.java:535) 
    at org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO$Read.expand(BigQueryIO.java:292) 
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:482) 
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:422) 
    at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44) 
    at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:164) 
    at BeamTest2.StarterPipeline.main(StarterPipeline.java:180) 
Caused by: java.io.NotSerializableException: org.apache.beam.sdk.Pipeline 
    at java.io.ObjectOutputStream.writeObject0(Unknown Source) 
    at java.io.ObjectOutputStream.defaultWriteFields(Unknown Source) 
    at java.io.ObjectOutputStream.writeSerialData(Unknown Source) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source) 
    at java.io.ObjectOutputStream.writeObject0(Unknown Source) 
    at java.io.ObjectOutputStream.defaultWriteFields(Unknown Source) 
    at java.io.ObjectOutputStream.writeSerialData(Unknown Source) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source) 
    at java.io.ObjectOutputStream.writeObject0(Unknown Source) 
    at java.io.ObjectOutputStream.defaultWriteFields(Unknown Source) 
    at java.io.ObjectOutputStream.writeSerialData(Unknown Source) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(Unknown Source) 
    at java.io.ObjectOutputStream.writeObject0(Unknown Source) 
    at java.io.ObjectOutputStream.writeObject(Unknown Source) 
    at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49) 
    ... 11 more 

的代码,我使用的是:

//Unzip incoming file 
     PCollection<TableRow> temp = p.apply(BigQueryIO.read().fromQuery(
     NestedValueProvider.of(
      options.getInputFile(), 
      new SerializableFunction<String, String>() { 
      private static final long serialVersionUID = 1L; 
      @Override 
      public String apply(String filepath) { 
       try{ 

       List<GcsPath> gcsPaths = util.expand(GcsPath.fromUri(filepath)); 
       LOG.info(gcsPaths+"FilesUnzipped"); 
        List<String> paths = new ArrayList<String>(); 

        for(GcsPath gcsp: gcsPaths){ 
         paths.add(gcsp.toString()); 
        } 
        p.apply(Create.of(paths)) 
         .apply(ParDo.of(new UnzipFN(filepath))); 

       } 
       catch(Exception e) 
       { 
        LOG.info("Exception caught while extracting ZIP"); 
       } 
       return ""; 
      } 
      })).usingStandardSql().withoutValidation()); 

UnzipFN类:

public class UnzipFN extends DoFn<String,Long>{ 
    private long filesUnzipped=0; 
    @ProcessElement 
    public void processElement(ProcessContext c){ 
     String p = c.element(); 
     GcsUtilFactory factory = new GcsUtilFactory(); 
     GcsUtil u = factory.create(c.getPipelineOptions()); 
     byte[] buffer = new byte[100000000]; 
     try{ 
      SeekableByteChannel sek = u.open(GcsPath.fromUri(p)); 
      InputStream is = Channels.newInputStream(sek); 
      BufferedInputStream bis = new BufferedInputStream(is); 
      ZipInputStream zis = new ZipInputStream(bis); 
      ZipEntry ze = zis.getNextEntry(); 
      while(ze!=null){ 
       LOG.info("Unzipping File {}",ze.getName()); 
       WritableByteChannel wri = u.create(GcsPath.fromUri("gs://bucket_location/" + ze.getName()), getType(ze.getName())); 
       OutputStream os = Channels.newOutputStream(wri); 
       int len; 
       while((len=zis.read(buffer))>0){ 
        os.write(buffer,0,len); 
       } 
       os.close(); 
       filesUnzipped++; 
       ze=zis.getNextEntry(); 


      } 
      zis.closeEntry(); 
      zis.close(); 

     } 
     catch(Exception e){ 
      e.printStackTrace(); 
     } 
    c.output(filesUnzipped); 
    System.out.println(filesUnzipped+"FilesUnzipped"); 
    LOG.info("FilesUnzipped"); 
    } 

    private String getType(String fName){ 
     if(fName.endsWith(".zip")){ 
      return "application/x-zip-compressed"; 
     } 
     else { 
      return "text/plain"; 
     } 
    } 
} 

如何处理这场景?

P.S. - .zip提取代码与BigQueryIO.read()无关。我只是用它作为黑客来读取运行时值。如果您有任何其他建议,请让我知道。

谢谢。

+0

NestedValueProvider中的SerializableFunction总是返回空字符串“” - 这是故意的吗?而应用UnzipFn产生的集合也被忽略。 – jkff

+0

另外它看起来像你试图添加新的图形步骤到你的NestedValueProvider的SerializableFunction内的管道。这是不可能的:管道首先被构建然后执行:你不能在运行时添加新的步骤。我很困惑你想做什么,所以我不确定如何帮助你做到这一点 - 请澄清你想要做的事情。 – jkff

+0

@jkff是的,这是故意的。所以基本上没有UnzipFN产生的收集。 UnzipFN的工作只是解压缩并提取其路径将在运行时提供的.zip文件的内容。所以我的意思是要问 - 如何解压缩GCS位置在运行时提供的文件? 如果除了我正在做的事情之外还有其他方式,请告诉我。 – rish0097

回答

1

如果我理解正确的,你有一个ValueProvider<String>包含filepattern,你正在扩大使用GcsUtil.expand()的filepattern,你想给一个函数(UnzipFn)适用于每个所产生的文件名。

目前的代码不会有几个原因的工作:

  • 你正在创建一个BigQueryIO.read().fromQuery()其中fromQuery()参数是ValueProvider总是返回空字符串(您NestedValueProvider,做了一堆东西后,总是返回空字符串"")。这会在运行时失败,因为查询不能为空。使用BigQueryIO作为黑客试图访问ValueProvider不是一个好主意 - 请参阅下文。
  • 您正在将函数中的步骤添加到函数中,以便从ValueProvider中提取值。该函数在管道正在运行时从worker中调用,以获取提供者的运行时值。在管道运行时,不可能从工作人员向管道添加步骤。
  • 你捕捉Pipeline对象为SerializableFunction关闭,并且它不能序列化,因为Pipeline不是Serializable - 因为没有合法的使用情况序列化Pipeline Java对象:它永远不会需要运到工人或对于跑步者来说,它只是一个临时构建器对象,用于在您的主程序中构建一些东西,您可以稍后调用.run()。另一方面,SerializableFunction运到工人,以便他们可以评估ValueProvider的当前值。

ValueProvider想象为仅在流水线运行时才具有值的占位符,而不是构建时的值 - 例如,你可以从DoFn内拨打provider.get()NestedValueProvider完全不会改变它 - 它只是简单地包装另一个ValueProvider,通常使用一些简单的转换逻辑,并且当您有ValueProvider<Something>但需要它作为ValueProvider<SomethingSlightlyDifferent>时,它将用作胶水代码。

的问题的关键是,你想只在运行时使用可用的值(你options.getInputFile()ValueProvider)做一些建设时 - 创建管道一步Create.of(paths)。在逻辑上不可能规避建设时ValueProvider的不可用性:ValueProvider专门用于表示在建造时尚未提供的值,因此它们作为占位符保留在管线描述中,并作为参数提供只有在管道运行时。你需要想出一个管道结构,其中输入文件是一个占位符,管道以你想要的方式处理它。

你可以这样说:

p.apply(Create.ofProvider(options.getInputFile(), StringUtf8Coder.of())) 
.apply(ParDo.of(new ExpandFn())) 
.apply(...fusion break...) 
.apply(ParDo.of(new UnzipFn())) 

其中ExpandFn将是一个DoFn,需要一个String并做你的GcsUtil.expand()的东西,并融合断见,例如执行JdbcIO.java

在Beam 2.2中(你可以在HEAD当前使用它),你不需要ExpandFn - 已经存在一个可以扩展文件模式等等的变换(例如,它可以递增地扩展文件模式并且继续观察新文件匹配它,在一个流媒体管道中)。所以你可以写得更简洁:

p.apply(FileIO.match().filepattern(options.getInputFile())) 
.apply(...fusion break...) 
.apply(ParDo.of(new UnzipFn())); 
+0

谢谢@jkff它的工作。我甚至没有进行融合突破。:) – rish0097

+0

很高兴它的工作,但请注意,没有融合突破,这段代码很可能会使用单线程并且没有并行性。 – jkff