1

(5d8e3f411b5a4ccb):java.lang.IllegalStateException:TimestampCombiner从2017-09-25T13移动元素:53:08.725Z更早时间2017-09-25T13:53:08.718 Z for window [2017-09-25T13:53:08.088Z..2017-09-25T13:53:08.719Z)有时让IllegalStateException异常,而在数据流亚军运行管道

可能是什么原因?

WindowFn代码很简单:

public class BQTablePartitionWindowFn extends NonMergingWindowFn<Object, IntervalWindow> { 

/** 
* 
*/ 
private static final long serialVersionUID = 1L; 

private IntervalWindow assignWindow(AssignContext context) { 
    TableRow tableRow = (TableRow) context.element(); 
    String timestamp = tableRow.get(BQConstants.LOG_TIME).toString(); 
    String currentTime = DateUtil.getFormatedDate(new Date()); 
    DateTimeFormatter formatter = DateTimeFormat.forPattern(CommonConstants.DATE_FORMAT_YYYYMMDD_HHMMSS_SSS) 
      .withZoneUTC(); 
    Instant start_point = Instant.parse(timestamp, formatter); 
    Instant end_point = Instant.parse(currentTime, formatter); 

    return new IntervalWindow(start_point, end_point); 
}; 

@Override 
public Coder<IntervalWindow> windowCoder() { 
    return IntervalWindow.getCoder(); 
} 

@Override 
public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception { 
    return Arrays.asList(assignWindow(c)); 
} 

@Override 
public boolean isCompatible(WindowFn<?, ?> other) { 
    return false; 
} 

@Override 
public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() { 
    throw new IllegalArgumentException(
      "Attempted to get side input window for GlobalWindow from non-global WindowFn"); 
} 

}

+0

感谢您的补充细节。我已经扩展了我的答案来讨论你的'WindowFn'。 –

回答

1

GroupByKey的默认行为是输出iterables与时间戳是允许在窗口中的最大时间戳。对于你的窗口,这是时间戳13:53:08.718Z

元素具有时间戳13:53:08.725Z,它不在从13:53:08.088Z13:53:08.719Z的窗口中。

你能分享你的WindowFn以及你有调整时间戳的ParDo吗?

更新:感谢您分享您的WindowFn。有几件事会对你造成问题。

1.分配窗口的开始时间不是基于元素的时间戳。

您提取元素的列并根据context.element().get(BQConstants.LOG_TIME)(忽略强制转换和分析)的值分配窗口。从你的错误信息看来,这不是context.timestamp()的实际值,它是元素的事件时间时间戳。

取而代之,您应该编写WindowFn以使用context.timestamp()。您可以确保时间戳是要在根据你的数据是否有界不同的方式是什么:

  • 如果你的数据是有界的,你可以使用WithTimestamps通过提取现场分配时间戳。
  • 如果您的数据是无限的,则源需要知道更多信息以便管理水印,因此配置取决于源。例如,PubsubIO会从您可以指定的属性中读取时间戳。

2.到指定的窗口的结束时间是基于系统日期

几个问题:

  • 终止时间(四舍五入),可能先开始时间,导致无效的窗口。
  • 结束时间不确定。 Beam的一般期望是,你将确定性地基于元素的时间戳(其必须落在窗口的结尾之前)并且其次在元素本身上确定性地分配窗口。指定这样的非确定性窗口可能具有无法预料的缺点。一个已知问题是您的结果不可重现,如果您需要修复数据处理错误或对归档数据运行实验,这可能会造成麻烦。这取决于你的使用情况,但你可能会考虑一些更具前瞻性的东西。

这里的目标是什么?您是否将此设置为仅为动态目标提取端点?如果是这样,我会建议将数据分成什么时候发生,而不是在什么时候处理。

+0

请参阅上面编辑 – Jack

+0

谢谢!我相应地扩展了我的答案。 –

+0

感谢您的解释。 我可以进行更改以避免占用系统时间。------------------------- 并且是的,我正在设置此项只是为了提取动态端点目的地。我需要根据logtime设置分区。 – Jack