2016-09-16 114 views
1

我计算过timewindow计数(求和1)如下:弗林克timeWindow得到启动时间

mappedUserTrackingEvent 
      .keyBy("videoId", "userId") 
      .timeWindow(Time.seconds(30)) 
      .sum("count") 

我想实际添加的窗口起始时间作为重点领域了。所以结果会是这样的:

key: videoId=123,userId=234,time=2016-09-16T17:01:30 
value: 50 

因此,本质上聚合窗口计数。最终目标是绘制这些窗口的直方图。

如何添加窗口的开始作为密钥中的字段?然后在这种情况下将窗口对齐到00或30秒?那可能吗?

回答

2

WindowFunctionapply()方法提供了Window对象,它是一个TimeWindow如果使用keyBy().timeWindow()TimeWindow对象有两种方法,分别为getStart()getEnd(),分别返回窗口开始和结束的时间戳。

目前不可能将sum()聚合与WindowFunction一起使用。你需要做的是这样的:

mappedUserTrackingEvent 
     .keyBy("videoId", "userId") 
     .timeWindow(Time.seconds(30)) 
     .apply(new MySumReduceFunction(), new MyWindowFunction());` 

MySumReduceFunction实现ReduceFunction接口,并通过增量聚集在窗口到达元素计算总和。 MyWindowFunction执行WindowFunction。它通过参数Iterable接收汇总值,并使用从TimeWindow参数获得的时间戳来丰富该值。

0

您可以使用方法aggregate而不是sum。
aggregate设置第二个参数执行WindowFunction或延伸ProcessWindowFunction
我使用弗林克-1.4.0,推荐使用ProcessWindowFunction,如:

mappedUserTrackingEvent 
    .keyBy("videoId", "userId") 
    .timeWindow(Time.seconds(30)) 
    .aggregate(new Count(), new MyProcessWindowFunction(); 

public static class MyProcessWindowFunction extends ProcessWindowFunction<Integer, Tuple2<Long, Integer>, Tuple, TimeWindow> 
{ 
    @Override 
    public void process(Tuple tuple, Context context, Iterable<Integer> iterable, Collector<Tuple2<Long, Integer>> collector) throws Exception 
    { 
     context.currentProcessingTime(); 
     context.window().getStart(); 
    } 
}