2016-10-04 53 views
1

我一直在努力处理需要发出端输出的作业,因为我不断收到异常('无法序列化xxx')。无法使用OutputTags运行作业

即使我明确地指定了,我有工作,我一直得到同样的错误类型编码器,所以我决定写一个简单的工作本文档以下内容:

https://cloud.google.com/dataflow/model/par-do#tags-for-side-outputs

令我惊讶,我仍然得到同样的例外,现在我怀疑我一定犯了错误(但我自己无法弄清楚)。就代码而言,我试图按照上面给出的例子。

下面,我张贴源代码以及我运行它时得到的错误消息。我相信这是可重复的(将'GCS_BUCKET'更改为您拥有的任何存储桶,并创建使用args调用'TestSideOutput'的main()方法),并且知道其他人是否可以重现它们将是很好的。 我们正在使用JDK 8和Dataflow SDK 1.7.0。

请注意,上面的文档中的示例使用了一个匿名类来扩展DoFn,我也试过但得到了同样的错误信息;下面的代码将此类重构为一个命名的内部类('Filter')。

我也尝试初始化没有花括号(“{}”)的TupleTags - 因为这实际上会产生警告 - 这会导致异常(请参阅本文中的最后一个代码片段)。

这里是我使用的代码:

package tmp.dataflow.experimental; 

import com.google.cloud.dataflow.sdk.Pipeline; 
import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; 
import com.google.cloud.dataflow.sdk.io.TextIO; 
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; 
import com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner; 
import com.google.cloud.dataflow.sdk.transforms.DoFn; 
import com.google.cloud.dataflow.sdk.transforms.ParDo; 
import com.google.cloud.dataflow.sdk.values.PCollection; 
import com.google.cloud.dataflow.sdk.values.PCollectionTuple; 
import com.google.cloud.dataflow.sdk.values.TupleTag; 
import com.google.cloud.dataflow.sdk.values.TupleTagList; 
import com.moloco.dataflow.DataflowConstants; 

public class TestSideOutput { 
    private TestOptions options; 
    private static final String GCS_BUCKET = "gs://dataflow-experimental/"; // Change to your bucket name 

    public TestSideOutput(String[] args) { 
    options = PipelineOptionsFactory.fromArgs(args).as(TestOptions.class); 
    options.setProject(DataflowConstants.PROJCET_NAME); 
    options.setStagingLocation(DataflowConstants.STAGING_BUCKET); 
    options.setRunner(BlockingDataflowPipelineRunner.class); 
    options.setJobName(options.getJob() + "-test-sideoutput"); 
    } 

    public void execute() { 
    Pipeline pipeline = Pipeline.create(options); 
    // 1. Read sample data. 
    PCollection<String> profiles = pipeline.apply(TextIO.Read.named("reading") 
     .from(GCS_BUCKET + "example/sample-data/sample-data*").withCoder(StringUtf8Coder.of())); 

    // 2. Create tags for outputs. 
    final TupleTag<String> mainTag = new TupleTag<String>() {}; 
    final TupleTag<String> sideTag = new TupleTag<String>() {}; 

    // 3. Apply ParDo with side output tags. 
    Filter filter = new Filter("DATAFLOW", sideTag); 
    PCollectionTuple results = 
     profiles.apply(ParDo.named("FilterByKeyword").withOutputTags(mainTag, TupleTagList.of(sideTag)).of(filter)); 

    // 4. Retrieve outputs. 
    PCollection<String> mainOutput = results.get(mainTag); 
    PCollection<String> sideOutput = results.get(sideTag); 

    // 5. Write to GCS. 
    mainOutput.apply(
     TextIO.Write.named("writingMain").to(GCS_BUCKET + "example/main-output/main").withCoder(StringUtf8Coder.of())); 
    sideOutput.apply(
     TextIO.Write.named("writingSide").to(GCS_BUCKET + "example/side-output/side").withCoder(StringUtf8Coder.of())); 

    // 6. Run pipeline. 
    pipeline.run(); 
    } 

    static class Filter extends DoFn<String, String> { 
    private static final long serialVersionUID = 0; 
    final TupleTag<String> sideTag; 
    String keyword; 

    public Filter(String keyword, TupleTag<String> sideTag) { 
     this.sideTag = sideTag; 
     this.keyword = keyword; 
    } 

    @Override 
    public void processElement(ProcessContext c) throws Exception { 
     String profile = c.element(); 
     if (profile.contains(keyword)) { 
     c.output(profile); 
     } else { 
     c.sideOutput(sideTag, profile); 
     } 
    } 
    } 
} 

这是我用过的命令和错误/例外,我得到了(它只是包含了我们使用我们的数据流包几个命令行参数,这里没有什么特别,只是给你一个想法):对于“TestOptions”类

dataflow-20161003.R3$ ./bin/dataflow --job=test-experimental-sideoutput --numWorkers=1 --date=0001-01-01 
Oct 04, 2016 12:37:34 AM com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner fromOptions 
INFO: PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 121 files. Enable logging at DEBUG level to see which files will be staged. 
Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize [email protected] 
     at com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:54) 
     at com.google.cloud.dataflow.sdk.util.SerializableUtils.clone(SerializableUtils.java:91) 
     at com.google.cloud.dataflow.sdk.transforms.ParDo$BoundMulti.<init>(ParDo.java:959) 
     at com.google.cloud.dataflow.sdk.transforms.ParDo$UnboundMulti.of(ParDo.java:912) 
     at com.google.cloud.dataflow.sdk.transforms.ParDo$UnboundMulti.of(ParDo.java:908) 
     at tmp.dataflow.experimental.TestSideOutput.execute(TestSideOutput.java:41) 
     at com.moloco.dataflow.Main.main(Main.java:152) 
Caused by: java.io.NotSerializableException: tmp.dataflow.experimental.TestSideOutput 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) 
     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
     at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
     at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
     at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
     at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
     at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
     at com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:50) 
     ... 6 more 

另外,我不认为这是相关的,但代码:

package tmp.dataflow.experimental; 

import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; 
import com.google.cloud.dataflow.sdk.options.Description; 
import com.google.cloud.dataflow.sdk.options.Validation; 

public interface TestOptions extends DataflowPipelineOptions { 
    @Description("Job") 
    @Validation.Required 
    String getJob(); 

    void setJob(String value); 

    @Description("Job suffix") 
    String getJobSuffix(); 

    void setJobSuffix(String value); 

    @Description("Date") 
    @Validation.Required 
    String getDate(); 

    void setDate(String value); 
} 

最后,如果我要在实例化TupleTags时删除大括号“{}”,我会得到以下异常(而我发现Stackoverflow上的建议我应该立即使用“{}”来避免这种问题):

Oct 04, 2016 12:43:56 AM com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner fromOptions 
INFO: PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 122 files. Enable logging at DEBUG level to see which files will be staged. 
Exception in thread "main" java.lang.IllegalStateException: Unable to return a default Coder for FilterByKeyword.out1 [PCollection]. Correct one of the following root causes: 
    No Coder has been manually specified; you may do so using .setCoder(). 
    Inferring a Coder from the CoderRegistry failed: Cannot provide a coder for type variable V (declared by class com.google.cloud.dataflow.sdk.values.TupleTag) because the actual type is unknown due to erasure. If this error occurs for a side output of the producing ParDo, verify that the TupleTag for this output is constructed with proper type information (see TupleTag Javadoc) or explicitly set the Coder to use if this is not possible. 
    Using the default output Coder from the producing PTransform failed: Cannot provide a coder for type variable V (declared by class com.google.cloud.dataflow.sdk.values.TupleTag) because the actual type is unknown due to erasure. 
     at com.google.cloud.dataflow.sdk.values.TypedPValue.inferCoderOrFail(TypedPValue.java:195) 
     at com.google.cloud.dataflow.sdk.values.TypedPValue.getCoder(TypedPValue.java:48) 
     at com.google.cloud.dataflow.sdk.values.PCollection.getCoder(PCollection.java:137) 
     at com.google.cloud.dataflow.sdk.values.TypedPValue.finishSpecifying(TypedPValue.java:88) 
     at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:331) 
     at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:274) 
     at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:161) 
     at tmp.dataflow.experimental.TestSideOutput.execute(TestSideOutput.java:50) 
     at com.moloco.dataflow.Main.main(Main.java:152) 

编辑:参见下面由制造执行() '静态' 解决这个问题的答案。

下面的代码与我最初发布的代码相似,但有两处更改: 只要有可能,我会为每个PCollection再次明确指定(和冗余)“编码器”。另外,在实例化TupleTags时,不需要大括号。

注意确定哪种方法(静态vs这种冗余方法)更合适。

public void execute() { 
    Pipeline pipeline = Pipeline.create(options); 
    // 1. Read sample data. 
    PCollection<String> profiles = pipeline.apply(TextIO.Read.named("reading") 
     .from(GCS_BUCKET + "example/sample-data/sample-data*").withCoder(StringUtf8Coder.of())); 

    // 2. Create tags for outputs. 
    final TupleTag<String> mainTag = new TupleTag<String>(); 
    final TupleTag<String> sideTag = new TupleTag<String>(); 

    // 3. Apply ParDo with side output tags. 
    Filter filter = new Filter("DATAFLOW", sideTag); 
    PCollectionTuple results = profiles.setCoder(StringUtf8Coder.of()) 
     .apply(ParDo.named("FilterByKeyword").withOutputTags(mainTag, TupleTagList.of(sideTag)).of(filter)); 

    // 4. Retrieve outputs. 
    PCollection<String> mainOutput = results.get(mainTag); 
    PCollection<String> sideOutput = results.get(sideTag); 

    // 5. Write to GCS. 
    mainOutput.setCoder(StringUtf8Coder.of()).apply(TextIO.Write.named("writingMain") 
     .to(GCS_BUCKET + "example/main-output-from-nonstatic/main").withCoder(StringUtf8Coder.of())); 
    sideOutput.setCoder(StringUtf8Coder.of()).apply(TextIO.Write.named("writingSide") 
     .to(GCS_BUCKET + "example/side-output-from-nonstatic/side").withCoder(StringUtf8Coder.of())); 

    // 6. Run pipeline. 
    pipeline.run(); 
    } 

回答

2

你得到的错误是因为你的Filter FN引用TupleTag,这反过来又(因为它是从一个非静态函数​​实例化的非静态匿名类)引用封闭TestSideOutput

所以管道试图序列化TestSideOutput对象,它不可序列化 - 如消息:java.io.NotSerializableException: tmp.dataflow.experimental.TestSideOutput所证明。

根本原因在于方法​​不是静态的。使其成为静态应该可以解决问题。

+0

确实,哟建议解决我遇到的问题。谢谢! 另一方面,我们有另一个工作,它有一个非静态的execute()方法,我们将ParDo应用于side output标签,并且它不会抛出异常(这也是我为什么写了上面的示例代码的一部分,因为它对我来说似乎很奇怪)。我现在不能真的发布这些代码,但我想知道是否有另一种方法可以解决这个问题,而不会使execute()方法变为静态的? –

+0

我有点回答我的后续问题(请参阅我编辑的问题结尾添加的代码片段)。看起来有可能通过在任何可能的情况下明确和冗余地声明编码器来将execute()保持为非静态的。 –