0

我目前正试图基于Apache梁SDK V2.1.0像Google tutorial阿帕奇梁模板:运行时错误上下文

创建数据流模板这是我的主类

public static void main(String[] args) { 

    // Initialize options 
    DispatcherOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(DispatcherOptions.class); 

    // Create pipeline 
    Pipeline pipeline = Pipeline.create(options); 

    // Get messages 
    PCollection<PubsubMessage> messages = pipeline.apply("ReadMain", PubsubIO.readMessages().fromSubscription(options.getSubscription())); 

    } 

如果我执行

mvn compile exec:java \ 
-Dexec.mainClass=com.example.myclass \ 
-Dexec.args="--runner=DataflowRunner \ 
       --project=[YOUR_PROJECT_ID] \ 
       --stagingLocation=gs://[YOUR_BUCKET_NAME]/staging \ 
       --templateLocation=gs://[YOUR_BUCKET_NAME]/templates/MyTemplate" 

命令它的工作,如果我使用的方法

PubsubIO.readMessages().fromTopic(options.getTopic())); 

但如果

PubsubIO.readMessages().fromSubscription(options.getSubscription())); 

错误

[WARNING] 
java.lang.RuntimeException: Not called from a runtime context. 
    at org.apache.beam.sdk.options.ValueProvider$RuntimeValueProvider.get(ValueProvider.java:223) 
    at org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131) 
    at org.apache.beam.sdk.options.ValueProvider$NestedValueProvider.get(ValueProvider.java:131) 
    at org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource.getSubscription(PubsubUnboundedSource.java:1374) 
    at org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource$PubsubSource.<init>(PubsubUnboundedSource.java:1103) 
    at org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource.expand(PubsubUnboundedSource.java:1407) 
    at org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSource.expand(PubsubUnboundedSource.java:110) 
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:514) 
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:454) 
    at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44) 
    at org.apache.beam.sdk.io.gcp.pubsub.PubsubIO$Read.expand(PubsubIO.java:730) 
    at org.apache.beam.sdk.io.gcp.pubsub.PubsubIO$Read.expand(PubsubIO.java:536) 
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:514) 
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:473) 
    at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:56) 
    at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:180) 
    at com.example.myclass.main(MyClass.java:43) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282) 
    at java.lang.Thread.run(Thread.java:748) 

回答