2016-12-14 115 views
1

在Flink中,我正在使用readTextFile读取文件并将60毫秒的SlidingProcessingTimeWindows.of(Time.milliseconds(60), Time.milliseconds(60))应用于滑动60毫秒。在窗口流上,我正在计算元组的第二个字段的均值。我的文本文件包含1100行,每行都是元组(String,Integer)。我已将并行性设置为1,并在元组的第一个字段上键入消息。滑动处理时间窗口计算不一致的结果

当我运行代码时,每次得到不同的答案。我的意思是,它看起来像,有时读取整个文件,有时它读取文件的第一行。它与滑动窗口的大小有关系吗?如何找出这种关系,以便我可以决定窗口的大小和滑动量?

+1

使用处理时间时,您不能指望一致的可重复结果。在你的情况下,它只是一个问题,可以在60毫秒内处理多少文件,这取决于系统负载等等。有时它能够处理整个文件,有时它不能;这很正常。看看http://data-artisans.com/how-apache-flink-enables-new-streaming-applications-part-1/了解更多信息。 –

回答

1

AlpineGizmo评论中的答案是正确的。我会在这里添加更多的细节。

Flink将时间窗口对齐到时期开始(1970-01-01-00:00:00)。这意味着窗口操作员具有1小时的窗口,并且每个新的小时(即,在00:00,01:00,02:00,...)开始新的窗口,而不是第一个到达的记录。

处理时间窗口基于系统的当前时间进行评估。 正如上面评论所述,这意味着可以处理的数据量取决于操作员运行的机器的处理资源(硬件,CPU/IO负载等)。因此,处理时间窗口不能产生可靠和一致的结果。

我的情况,两个描述的效果可能会导致跨作业不一致的结果。根据何时开始作业,数据将被分配到不同的窗口(如果第一个记录在第一个60毫秒窗口关闭之前到达,则只有该元素将在窗口中)。根据机器的IO负载,访问和读取文件可能需要更多或更少的时间。

如果您想要获得一致的结果,则需要使用事件时间。在这种情况下,基于数据中编码的时间来处理记录,即,结果仅取决于数据,而不取决于外部效果,例如作业的开始时间或处理机器的负载。