(5d8e3f411b5a4ccb):java.lang.IllegalStateException:TimestampCombiner从2017-09-25T13移动元素:53:08.725Z更早时间2017-09-25T13:53:08.718 Z for window [2017-09-25T13:53:08.088Z..2017-09-25T13:53:08.719Z)有时让IllegalStateException异常,而在数据流亚军运行管道
可能是什么原因?
WindowFn代码很简单:
public class BQTablePartitionWindowFn extends NonMergingWindowFn<Object, IntervalWindow> {
/**
*
*/
private static final long serialVersionUID = 1L;
private IntervalWindow assignWindow(AssignContext context) {
TableRow tableRow = (TableRow) context.element();
String timestamp = tableRow.get(BQConstants.LOG_TIME).toString();
String currentTime = DateUtil.getFormatedDate(new Date());
DateTimeFormatter formatter = DateTimeFormat.forPattern(CommonConstants.DATE_FORMAT_YYYYMMDD_HHMMSS_SSS)
.withZoneUTC();
Instant start_point = Instant.parse(timestamp, formatter);
Instant end_point = Instant.parse(currentTime, formatter);
return new IntervalWindow(start_point, end_point);
};
@Override
public Coder<IntervalWindow> windowCoder() {
return IntervalWindow.getCoder();
}
@Override
public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception {
return Arrays.asList(assignWindow(c));
}
@Override
public boolean isCompatible(WindowFn<?, ?> other) {
return false;
}
@Override
public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() {
throw new IllegalArgumentException(
"Attempted to get side input window for GlobalWindow from non-global WindowFn");
}
}
感谢您的补充细节。我已经扩展了我的答案来讨论你的'WindowFn'。 –