2017-05-04 63 views
0

我不知道在Apache Storm中处理以下问题的最佳做法是什么。风暴:不同大小的几个滑动窗口的最小/最大聚合

我有一个喷嘴,生成一个带有明确时间戳的整数值流。我们的目标是用三个滑动窗口在这个流以最小/最大聚集:

  • 最后一小时
  • 最后一天,即过去24小时

最后时刻很简单:

topology.setBolt("1h", ...) 
    .shuffleGrouping("spout") 
    .withWindow(Duration.hours(1), Duration.seconds(10)) 
    .withTimestampField("timestamp")); 

但是,对于较长的时间段,我担心窗口的队列大小。当我像最后一小时聚合一样直接从喷口中获取元组时,每个元组都会在队列中结束。

一种可能性是从预先聚合的“1h”螺栓中消耗元组。但是,因为我使用明确的时间戳,所以从“1h”螺栓到达的延迟元组被忽略。 1小时的延迟不是一个选项,因为这会延迟对窗口的评估。有没有办法在不影响结果及时性的情况下“延迟”元组?

当然,我也可以每小时存储一个聚合数据,然后计算最近24小时的最小值,包括来自“1h”流的最新值。但我很好奇,如果有一种方法可以正确使用Storm手段来做到这一点。

更新1

由于arunmahadevan的答案,我改变了1H分钟螺栓发出的所有元组在各个1H窗口中的最大时间戳最小的元组。这样消费螺栓不会因为迟到而丢弃元组。我还介绍了一个新字段original-timestamp以保留最小元组的原始时间戳。

更新2

我最后通过仅发射在1H分钟螺栓的状态变化中发现的甚至更好的方法。只要没有接收到新的元组,Storm就不会在消费螺栓中增加时间,因此阻止了迟到问题。另外,我可以保留原始时间戳而不将其复制到单独的字段中。

回答

0

我认为周期性地发出min从“1h”到“24h”螺栓应该工作并且保持“24h”队列大小被检查。

如果配置滞后,则只有在该滞后后(即事件时间超过滑动间隔+滞后时)才调用螺栓的执行。

可以说,如果“1h”螺栓配置滞后1分钟,则只有在事件时间超过02:01后才会在01:00 - 02:00之间为元组调用执行。 (即螺栓已经看到时间戳≥2:01的事件)。然而,执行只会在01:00和02:00之间收到元组。

现在,如果计算最后一小时的最小值,并将结果发送到滑动间隔为1小时和滞后= 0的“24小时”螺栓,它将触发一旦进入事件的时间戳穿过下一个小时。如果您在02:00发出01:00-02:00分钟,一旦收到分钟事件,“24h”窗口将触发(对于前一天02:00至02:00之间的事件)因为事件时间超过了下一个小时,并且配置的延迟为0。

+0

确定您真正帮助我的答案中至关重要的部分是发出最低限度的最小时间戳窗口。 –

+0

您是否对如何实现平均聚合相同的想法?这是不同的,因为我不能在1h螺栓上使用滑动窗口,因为这会导致相同的元组被平均多次合并。有没有办法将基于翻转窗口的预聚合与来自源喷口的最新元素结合起来?一旦它们被来自翻滚窗口的聚合物覆盖,我就需要从队列中驱逐源喷口元素。但在风暴中似乎没有可能实施自定义驱逐政策...... –