我试图使用TestStream来实验,看看如何处理后期元素,但得到了一些非常有趣和令人困惑的行为。Apache Stream TestStream与延迟元素
具体来说,我在窗口(windowTwo)中添加一个带有时间戳的元素“2”,然后在窗口结束之后但在endOfWindow + Lateness之前移动水印,最后添加另一个元素“3 “窗口内有时间戳。
有趣和令人困惑的事情是:我希望看到在windowTwo所有元素的总和,但它失败,并说
预计:在迭代过[< 5>]任何命令, 不过:不匹配:< 2>
但是,如果我改变预期的总和从至,它仍然失败,并说
预计:迭代超过[< 2>]以任何顺序, 不过:不匹配:< 5>
这是怎么回事???
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.windowing.*;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
public class BeamAppTest {
@Rule
public final transient TestPipeline pipeline = TestPipeline.create();
@Test
@Category(NeedsRunner.class)
public void testApp() {
final Duration windowLengthMin = Duration.standardMinutes(10);
final Duration latenessMin = Duration.standardMinutes(5);
final Duration oneMin = Duration.standardMinutes(1);
final Instant windowOneStart = new Instant(0L).plus(Duration.standardMinutes(20));
final Instant windowOneEnd = windowOneStart.plus(windowLengthMin);
final IntervalWindow windowOne = new IntervalWindow(windowOneStart, windowOneEnd);
final Instant windowTwoStart = windowOneEnd;
final Instant windowTwoEnd = windowTwoStart.plus(windowLengthMin);
final IntervalWindow windowTwo = new IntervalWindow(windowTwoStart, windowTwoEnd);
TestStream<Integer> testStream = TestStream.create(BigEndianIntegerCoder.of())
.addElements(TimestampedValue.of(1, windowOneStart.plus(oneMin))) // early window one
.advanceWatermarkTo(windowOneEnd) // watermark passes window one
.addElements(TimestampedValue.of(2, windowTwoStart.plus(oneMin))) // early window two
.advanceWatermarkTo(windowTwoEnd.plus(latenessMin).minus(oneMin)) // water mark passes window two
.addElements(TimestampedValue.of(3, windowTwoStart.plus(oneMin))) // late window two
.advanceProcessingTime(oneMin.plus(oneMin))
.advanceWatermarkToInfinity();
PCollection<Integer> means = pipeline.apply(testStream).apply(new CalSum(windowLengthMin, latenessMin));
PAssert.that(means)
.inWindow(windowOne)
.containsInAnyOrder(1);
PAssert.that(means)
.inWindow(windowTwo)
.containsInAnyOrder(2); // change the 2 to 5 here to see magic!!!
pipeline.run().waitUntilFinish();
}
static class CalSum extends PTransform<PCollection<Integer>, PCollection<Integer>> {
private final Duration WINDOW_LENGTH_MIN;
private final Duration LATENESS_MIN;
CalSum(Duration windowLengthMin, Duration latenessMin) {
WINDOW_LENGTH_MIN = windowLengthMin;
LATENESS_MIN = latenessMin;
}
@Override
public PCollection<Integer> expand(PCollection<Integer> input) {
return input
.apply(Window
.<Integer>into(FixedWindows.of(WINDOW_LENGTH_MIN))
.withAllowedLateness(LATENESS_MIN)
.accumulatingFiredPanes() // accumulating trigger
.triggering(AfterWatermark.pastEndOfWindow() // trigger at end of window
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(2))) // trigger every 2 min within the window
.withLateFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(1))))) // trigger every 1 min after the window
.apply(Sum.integersGlobally().withoutDefaults());
}
}
}