2017-04-22 46 views
1

私人的数据流中buySideVolumeWMA(数据流buyPressureTradeStream){如何让Apache的弗林克滑动窗口只滑动到达窗口大小之后?

Integer windowSize = 3; 
    Integer windowslide = 1; 

    DataStream<Double> buySideVolumeWMAStream = buyPressureTradeStream.countWindowAll(windowSize, windowslide) 
      .apply(new AllWindowFunction<String, Double, GlobalWindow>() { 

       @Override 
       public void apply(GlobalWindow window, Iterable<String> values, Collector<Double> out) 
         throws Exception { 
        Double buySideVolumeWMA = 0.0; 
        Integer weight = windowSize; 
        Integer numerator = 1; 

        for (String tradeString : values) { 
         JSONObject json = new JSONObject(tradeString); 
         Double tradeVolume = (Double) json.get("Volume"); 
         buySideVolumeWMA += ((tradeVolume * numerator)/weight); 
         slf4jLogger.info("tradeVolume " + tradeVolume + " , " + "numerator , " + numerator 
           + " weight , " + weight + " buySideVolumeWMA " + buySideVolumeWMA); 
         numerator++; 

        } 
        numerator = 1; 

        out.collect(buySideVolumeWMA/2); 
        buySideVolumePressure = buySideVolumeWMA/2; 
        // slf4jLogger.info("buySideVolumePressure :" + 
        // buySideVolumePressure); 


    buySideVolumeWMAStream.print().setParallelism(5); 

    return buySideVolumeWMAStream; 

} 

================================= =======================================在这个程序中,我使用的3窗口大小和幻灯片尺寸1.我希望它开始滑动一旦接收流数3的数据,然后只能由1开始滑动,但发生的事情是我的程序启动,因为它首先接收数据立即滑动,然后将其滑入每单它接收的数据。所以如何让它滑动后才能收到count 3的数据然后再滑动1?

+0

你的问题还不清楚。你能详细说明你的问题吗?你应该添加一个你得到的和你想要的东西的例子。 – ImbaBalboa

+0

@ImbaBalboa。感谢您的回复。我在我的程序下面添加了一些关于我的问题的更多详细信息。请您在此引导我?谢谢 – Dhinesh

回答

1

您可以添加偏移到你的窗口。这是Window命令的第三个参数。这样你可以在我的意见后面开始。从文档

例子:

// sliding processing-time windows offset by -8 hours 
input 
    .keyBy(<key selector>) 
    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8))) 
    .<windowed transformation>(<window function>); 

要了解更多信息:https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/windows.html