2017-09-15 83 views
0

我创建一个测试,看看我的弗林克模式功能超时正确。我使用这个弗林克斯佩克特和我有以下测试用例:JunitTest的数据流中与弗林克斯佩克特

@Test 
public void SameDoor_TwoStatuses_OneSecondTimeoutPattern() { 
    // Arrange 
    long now = new Date().getTime(); 
    DoorEvent event1 = new DoorEvent(); 
    event1.setId(123); 
    event1.getDoor().setId(1); 
    event1.getDoor().setStatus("statusaaaaaa"); 
    event1.setTimestamp(now); 

    EventTimeInputBuilder<DoorEvent> builder = EventTimeInputBuilder.startWith(event1, event1.getTimestamp()); 
    DataStream<DoorEvent> stream = createTestStream(builder).assignTimestampsAndWatermarks(new TestTimestampExtractor<DoorEvent>()); 

    // Act 
    Pattern<DoorEvent, ?> pattern = StatusNotFollowedByAnotherStatusPattern.getPatternForSameDoor(1, "firstevent", "statusaaaaaa","secondevent", "status2"); 
    PatternStream<DoorEvent> pStream = CEP.pattern(stream, pattern); 

    DataStream<Either<Integer,Tuple2<Integer,Integer>>> patterns = pStream.select(getEventIdOfTimeoutEvent(),selectEventIdsOfPatterns()).forward(); 
    patterns.print(); // prints Left(123) 

    ExpectedRecords<Either<Integer,Tuple2<Integer,Integer>>> expectedRecords = 
     new ExpectedRecords<Either<Integer,Tuple2<Integer,Integer>>>() 
      .expect(new Left<Integer, Tuple2<Integer,Integer>>(123)); 

    expectedRecords.refine().sameFrequency(); 

    // Assert 
    assertStream(patterns, expectedRecords); 
} 

private PatternSelectFunction<DoorEvent, Tuple2<Integer, Integer>> selectEventIdsOfPatterns(){ 
    return new PatternSelectFunction<DoorEvent, Tuple2<Integer,Integer>>() { 
     private static final long serialVersionUID = 3830508947015151715L; 
     @Override 
     public Tuple2<Integer,Integer> select(Map<String, List<DoorEvent>> pattern) throws Exception { 
      Tuple2<Integer,Integer> t = new Tuple2<Integer,Integer>(); 
      t.f0 = pattern.get("firstevent").get(0).getId(); 
      t.f1 = pattern.get("secondevent").get(0).getId(); 
      return t; 
     } 
    }; 
} 

private PatternTimeoutFunction<DoorEvent, Integer> getEventIdOfTimeoutEvent(){ 
    return new PatternTimeoutFunction<DoorEvent, Integer>() { 
     private static final long serialVersionUID = 1L; 

     @Override 
     public Integer timeout(Map<String, List<DoorEvent>> arg0, long arg1) throws Exception { 
      int id = arg0.get("firstevent").get(0).getId(); 
      System.out.println("Timeout triggered on eventstatus " + arg0.get("firstevent").get(0).getDoor().getStatus()); 
      return id; 
     } 

    }; 
} 

我的代码不会打印status statusaaaaaa,这是我的第一个事件的状态是在模式,在patternTimeoutFunction。在定义的时间段内没有检测到第二个状态,所以超时被调用并向模式流添加一个整数。我如何在ExpectedRecords中说我预计值为123的左边?

编辑
我公司目前拥有的错误是:

org.apache.flink.runtime.client.JobExecutionException: Job execution failed. 
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:933) 
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876) 
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:876) 
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) 
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) 
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) 
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
Caused by: java.lang.RuntimeException: Exception occurred while processing valve output watermark: 
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289) 
    at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173) 
    at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108) 
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188) 
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69) 
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263) 
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702) 
    at java.lang.Thread.run(Unknown Source) 
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator 
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530) 
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) 
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) 
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) 
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) 
    at org.apache.flink.cep.operator.TimeoutKeyedCEPPatternOperator.emitTimedOutSequences(TimeoutKeyedCEPPatternOperator.java:77) 
    at org.apache.flink.cep.operator.TimeoutKeyedCEPPatternOperator.advanceTime(TimeoutKeyedCEPPatternOperator.java:68) 
    at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:242) 
    at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275) 
    at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107) 
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946) 
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286) 
    ... 7 more 
Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator 
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:530) 
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503) 
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483) 
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:575) 
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:536) 
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891) 
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869) 
    at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) 
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) 
    ... 18 more 
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: Type extraction is not possible on Either type as it does not contain information about the 'left' type. 
    at org.apache.flink.api.java.typeutils.EitherTypeInfoFactory.createTypeInfo(EitherTypeInfoFactory.java:37) 
    at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoFromFactory(TypeExtractor.java:1233) 
    at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForObject(TypeExtractor.java:2054) 
    at org.apache.flink.api.java.typeutils.TypeExtractor.getForObject(TypeExtractor.java:2044) 
    at io.flinkspector.datastream.functions.TestSink.invoke(TestSink.java:82) 
    at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:41) 
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528) 
    ... 26 more 

回答

0

的问题是在spectors TestSink功能的错误。 TestSink函数尝试在运行时提取Left的通用参数,这是不可能的。相反,当它被实例化以创建正确类型的串行器时,有必要将这些信息传递给TestSink函数。请在Github存储库中打开相应的问题,让开发人员知道。

+0

他们的github上做出的一个问题。 Thx回复:) –