2017-09-14 55 views
1

我试图使用TestStream来实验,看看如何处理后期元素,但得到了一些非常有趣和令人困惑的行为。Apache Stream TestStream与延迟元素

具体来说,我在窗口(windowTwo)中添加一个带有时间戳的元素“2”,然后在窗口结束之后但在endOfWindow + Lateness之前移动水印,最后添加另一个元素“3 “窗口内有时间戳。

有趣和令人困惑的事情是:我希望看到在windowTwo所有元素的总和,但它失败,并说

预计:在迭代过[< 5>]任何命令, 不过:不匹配:< 2>

但是,如果我改变预期的总和从至,它仍然失败,并说

预计:迭代超过[< 2>]以任何顺序, 不过:不匹配:< 5>

这是怎么回事???

import org.apache.beam.sdk.coders.BigEndianIntegerCoder; 
import org.apache.beam.sdk.testing.NeedsRunner; 
import org.apache.beam.sdk.testing.PAssert; 
import org.apache.beam.sdk.testing.TestPipeline; 
import org.apache.beam.sdk.testing.TestStream; 
import org.apache.beam.sdk.transforms.PTransform; 
import org.apache.beam.sdk.transforms.Sum; 
import org.apache.beam.sdk.transforms.windowing.*; 
import org.apache.beam.sdk.values.PCollection; 
import org.apache.beam.sdk.values.TimestampedValue; 
import org.joda.time.Duration; 
import org.joda.time.Instant; 
import org.junit.Rule; 
import org.junit.Test; 
import org.junit.experimental.categories.Category; 

public class BeamAppTest { 
    @Rule 
    public final transient TestPipeline pipeline = TestPipeline.create(); 

    @Test 
    @Category(NeedsRunner.class) 
    public void testApp() { 
     final Duration windowLengthMin = Duration.standardMinutes(10); 
     final Duration latenessMin = Duration.standardMinutes(5); 
     final Duration oneMin = Duration.standardMinutes(1); 

     final Instant windowOneStart = new Instant(0L).plus(Duration.standardMinutes(20)); 
     final Instant windowOneEnd = windowOneStart.plus(windowLengthMin); 
     final IntervalWindow windowOne = new IntervalWindow(windowOneStart, windowOneEnd); 

     final Instant windowTwoStart = windowOneEnd; 
     final Instant windowTwoEnd = windowTwoStart.plus(windowLengthMin); 
     final IntervalWindow windowTwo = new IntervalWindow(windowTwoStart, windowTwoEnd); 

     TestStream<Integer> testStream = TestStream.create(BigEndianIntegerCoder.of()) 
      .addElements(TimestampedValue.of(1, windowOneStart.plus(oneMin))) // early window one 
      .advanceWatermarkTo(windowOneEnd)         // watermark passes window one 
      .addElements(TimestampedValue.of(2, windowTwoStart.plus(oneMin))) // early window two 
      .advanceWatermarkTo(windowTwoEnd.plus(latenessMin).minus(oneMin)) // water mark passes window two 
      .addElements(TimestampedValue.of(3, windowTwoStart.plus(oneMin))) // late window two 
      .advanceProcessingTime(oneMin.plus(oneMin)) 
      .advanceWatermarkToInfinity(); 

     PCollection<Integer> means = pipeline.apply(testStream).apply(new CalSum(windowLengthMin, latenessMin)); 

     PAssert.that(means) 
      .inWindow(windowOne) 
      .containsInAnyOrder(1); 

     PAssert.that(means) 
      .inWindow(windowTwo) 
      .containsInAnyOrder(2); // change the 2 to 5 here to see magic!!! 

     pipeline.run().waitUntilFinish(); 
    } 

    static class CalSum extends PTransform<PCollection<Integer>, PCollection<Integer>> { 
     private final Duration WINDOW_LENGTH_MIN; 
     private final Duration LATENESS_MIN; 

     CalSum(Duration windowLengthMin, Duration latenessMin) { 
      WINDOW_LENGTH_MIN = windowLengthMin; 
      LATENESS_MIN = latenessMin; 
     } 

     @Override 
     public PCollection<Integer> expand(PCollection<Integer> input) { 
      return input 
       .apply(Window 
        .<Integer>into(FixedWindows.of(WINDOW_LENGTH_MIN)) 
        .withAllowedLateness(LATENESS_MIN) 
        .accumulatingFiredPanes() // accumulating trigger 
        .triggering(AfterWatermark.pastEndOfWindow() // trigger at end of window 
         .withEarlyFirings(AfterProcessingTime.pastFirstElementInPane() 
          .plusDelayOf(Duration.standardMinutes(2))) // trigger every 2 min within the window 
         .withLateFirings(AfterProcessingTime.pastFirstElementInPane() 
          .plusDelayOf(Duration.standardMinutes(1))))) // trigger every 1 min after the window 
       .apply(Sum.integersGlobally().withoutDefaults()); 
     } 
    } 
} 

回答

0

书面,与哪些元素到达的时间和水印,windowTwo包含两个元素:25。这是您设置的触发的结果:输入2的时间戳为windowTwoStart加上一分钟,当水印在windowTwo结束之前,并且因此按时发送。然后,水印前进超过windowTwo的末尾,导致AfterWatermark触发器触发。

发生这种情况后,输入3到达 - 这是在它所在的窗口水印后(所以元素迟了),但在水印已经通过窗口的末尾加上允许的迟到之前(所以元素不可丢弃)。因此,当水印再次前进时,该元素与前面的2(由于已选择的累积模式)一起生成,并与您观察到的5组合在一起。

准时窗格(您可以使用PAssert.that(means).inOnTimePane(windowTwo)进行匹配)只包含值2;在窗口的整个生命周期中,生成了25,因此inWindow断言将针对[2, 5]进行检查。