2017-04-02 62 views
3

我正在运行带有PubSub源的管道,并且遇到了一些导致我的管道崩溃的奇怪异常。我可以处理几个元素(3-10),然后突然之间出现以下两个错误消息之一。两者都不给我一个线索,我可能会做错,所以我删除了所有的转换,只留下了源代码,问题依然存在。我只发布了一些测试字符串到PubSub。任何帮助表示赞赏。Apache Beam PubSub读卡器异常

异常1:

[WARNING] 
java.lang.reflect.InvocationTargetException 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:606) 
     at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293) 
     at java.lang.Thread.run(Thread.java:724) 
Caused by: java.lang.NullPointerException 
     at org.apache.beam.sdk.io.PubsubUnboundedSource$PubsubReader.ackBatch(PubsubUnboundedSource.java:640) 
     at org.apache.beam.sdk.io.PubsubUnboundedSource$PubsubCheckpoint.finalizeCheckpoint(PubsubUnboundedSource.java:313) 
     at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.getReader(UnboundedReadEvaluatorFactory.java:174) 
     at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:127) 
     at org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:139) 
     at org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:107) 
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) 
     at java.util.concurrent.FutureTask.run(FutureTask.java:262) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 

例外2:

[WARNING] 
java.lang.reflect.InvocationTargetException 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:606) 
     at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293) 
     at java.lang.Thread.run(Thread.java:724) 
Caused by: java.lang.IllegalStateException: Cannot finalize a restored checkpoint 
     at org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:444) 
     at org.apache.beam.sdk.io.PubsubUnboundedSource$PubsubCheckpoint.finalizeCheckpoint(PubsubUnboundedSource.java:293) 
     at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.finishRead(UnboundedReadEvaluatorFactory.java:205) 
     at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:142) 
     at org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:139) 
     at org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:107) 
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) 
     at java.util.concurrent.FutureTask.run(FutureTask.java:262) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 

Basic代码:

PipelineOptions options = PipelineOptionsFactory.create(); 
PubsubOptions dataflowOptions = options.as(PubsubOptions.class); 
dataflowOptions.setStreaming(true); 

Pipeline p = Pipeline.create(options); 

p.apply(PubsubIO.<String>read().subscription("my-subscription") 
    .withCoder(StringUtf8Coder.of()))); 

执行:

mvn compile exec:java -Dexec.mainClass=my.package.SalesTransactions -Dexec.args="--runner BlockingDataflowRunner --project=my-project --tempLocation=gs://my-project/tmp" 

回答

1

由于DirectRunner中的Bug(BEAM-1656)和PubsubCheckpoint中的前提条件,存在此问题。

答案Apache Beam: PubsubReader fails with NPE包含有关错误以及如何解决的更多信息。谢谢!

+0

感谢您的回答。我已经更新到最新的快照,并且是第一个有希望的,因为它不会马上发生,但似乎在一段时间之后,上面的NullPointerException仍在发生。 – jimmy

+1

在https://github.com/apache/beam/pull/2368解决了PubSub中的一个错误,它很快就会提交。我带来的不便表示歉意。 –