2017-06-14 139 views
0

有时候,我得到异常之下,而运行流媒体数据流:谷歌云流媒体数据流:错误而获取侧输入

exception: "java.lang.RuntimeException: Exception while fetching side input: 
at com.google.cloud.dataflow.sdk.runners.worker.StateFetcher.fetchSideInput(StateFetcher.java:184) 
at com.google.cloud.dataflow.sdk.runners.worker.StreamingModeExecutionContext.fetchSideInput(StreamingModeExecutionContext.java:175) 
at com.google.cloud.dataflow.sdk.runners.worker.StreamingModeExecutionContext.access$400(StreamingModeExecutionContext.java:56) 
at com.google.cloud.dataflow.sdk.runners.worker.StreamingModeExecutionContext$StepContext.issueSideInputFetch(StreamingModeExecutionContext.java:401) 
at com.google.cloud.dataflow.sdk.runners.worker.StreamingSideInputFetcher.getReadyWindows(StreamingSideInputFetcher.java:135) 
at com.google.cloud.dataflow.sdk.runners.worker.StreamingSideInputDoFnRunner.startBundle(StreamingSideInputDoFnRunner.java:49) 
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.reallyStartBundle(SimpleParDoFn.java:175) 
at com.google.cloud.dataflow.sdk.runners.worker.SimpleParDoFn.startBundle(SimpleParDoFn.java:117) 
at com.google.cloud.dataflow.sdk.runners.worker.ForwardingParDoFn.startBundle(ForwardingParDoFn.java:36) 
at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.start(ParDoOperation.java:45) 
at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:69) 
at com.google.cloud.dataflow.sdk.runners.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:719) 
at com.google.cloud.dataflow.sdk.runners.worker.StreamingDataflowWorker.access$600(StreamingDataflowWorker.java:95) 
at com.google.cloud.dataflow.sdk.runners.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:538) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 
Caused by: com.google.cloud.dataflow.worker.repackaged.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalArgumentException: Duplicate values for 2059 
at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2207) 
at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache.get(LocalCache.java:3953) 
at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4790) 
at com.google.cloud.dataflow.sdk.runners.worker.StateFetcher.fetchSideInput(StateFetcher.java:175) 
... 16 more 
Caused by: java.lang.IllegalArgumentException: Duplicate values for 2059 
at com.google.cloud.dataflow.sdk.util.PCollectionViews$MapPCollectionView.fromElements(PCollectionViews.java:291) 
at com.google.cloud.dataflow.sdk.util.PCollectionViews$MapPCollectionView.fromElements(PCollectionViews.java:273) 
at com.google.cloud.dataflow.sdk.util.PCollectionViews$PCollectionViewBase.fromIterableInternal(PCollectionViews.java:368) 
at com.google.cloud.dataflow.sdk.runners.worker.StateFetcher$2.call(StateFetcher.java:152) 
at com.google.cloud.dataflow.sdk.runners.worker.StateFetcher$2.call(StateFetcher.java:104) 
at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4793) 
at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3542) 
at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2323) 
at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2286) 
at com.google.cloud.dataflow.worker.repackaged.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2201) 
... 19 more 

数据流工人机型:N1-标准4

工作者高速缓冲存储器MB内存: 2048

数据流主要输入:PubSub的订阅。 我正在创建BT的sideInput,并将此sideInputs传递给多个转换。我的sideinput的大小小于100Mb。

谢谢。

回答

0

该错误表示已经遇到了具有相同键(2059)的多个值,这违反了对Map值边输入的期望。这可能发生在流式传输中,尤其是如果您多次触发相同的值。如果您使用Multimap,它应该允许您检索与给定键相关联的所有值。