1
私人的数据流中buySideVolumeWMA(数据流buyPressureTradeStream){如何让Apache的弗林克滑动窗口只滑动到达窗口大小之后?
Integer windowSize = 3;
Integer windowslide = 1;
DataStream<Double> buySideVolumeWMAStream = buyPressureTradeStream.countWindowAll(windowSize, windowslide)
.apply(new AllWindowFunction<String, Double, GlobalWindow>() {
@Override
public void apply(GlobalWindow window, Iterable<String> values, Collector<Double> out)
throws Exception {
Double buySideVolumeWMA = 0.0;
Integer weight = windowSize;
Integer numerator = 1;
for (String tradeString : values) {
JSONObject json = new JSONObject(tradeString);
Double tradeVolume = (Double) json.get("Volume");
buySideVolumeWMA += ((tradeVolume * numerator)/weight);
slf4jLogger.info("tradeVolume " + tradeVolume + " , " + "numerator , " + numerator
+ " weight , " + weight + " buySideVolumeWMA " + buySideVolumeWMA);
numerator++;
}
numerator = 1;
out.collect(buySideVolumeWMA/2);
buySideVolumePressure = buySideVolumeWMA/2;
// slf4jLogger.info("buySideVolumePressure :" +
// buySideVolumePressure);
buySideVolumeWMAStream.print().setParallelism(5);
return buySideVolumeWMAStream;
}
================================= =======================================在这个程序中,我使用的3窗口大小和幻灯片尺寸1.我希望它开始滑动一旦接收流数3的数据,然后只能由1开始滑动,但发生的事情是我的程序启动,因为它首先接收数据立即滑动,然后将其滑入每单它接收的数据。所以如何让它滑动后才能收到count 3的数据然后再滑动1?
你的问题还不清楚。你能详细说明你的问题吗?你应该添加一个你得到的和你想要的东西的例子。 – ImbaBalboa
@ImbaBalboa。感谢您的回复。我在我的程序下面添加了一些关于我的问题的更多详细信息。请您在此引导我?谢谢 – Dhinesh