1

我对Spark还是比较新的,我正在努力实现迭代函数。我希望有人能帮助我?Spark重复函数CUSUM

特别是,我试图实现CUSUM控制统计:

$ S_I = \ MAX(0,S_ {I-1} + X_I - 目标 - 含$以$ S_0 = 0 $和$瓦特,目标$固定参数。

的挑战是,CUSUM统计量定义为迭代函数需要有序数据和前一函数值。

下面的数据帧显示所期望的输出对于$ Target = 1 $和$ w = 0.1 $:

i x S 
-------------- 
1 1.3 0.2 
2 1.8 0.9 
3 0.5 0.3 
4 0.6 0 
5 1.2 0.1 
6 1.8 0.8 

在不同的说明:我想这是不可能以分布式方式运行CUSUM?我的数据集相当大,但包含多个组。我希望这意味着我仍然可以实现一些并发。我想我必须重新分区我的数据,让每个组有一个分区才能同时为每个组运行CUSUM算法?

我希望这是有道理的,任何指针都非常感谢! 理想情况下,我正在寻找一个解决方案在斯卡拉和火花2.1

非常感谢!

回答

0

很多谷歌的研究后,我发现使用mapPartitions

val dataset = Seq(1.3, 1.8, 0.5, 0.6, 1.2, 1.8).toDS 

dataset.repartition(1).mapPartitions(iterator => { 
    var s = 0.0 
    val target = 1.0 
    val w = 0.1 
    iterator.map(x => { 
     s = Math.max(0.0, s + x -target - w) 
     Math.round(10.0 *s)/10.0 
    }) 
}).show() 

+-----+ 
|value| 
+-----+ 
| 0.2| 
| 0.9| 
| 0.3| 
| 0.0| 
| 0.1| 
| 0.8| 
+-----+ 

我希望这一段时间救一个人在未来的问题的解决方案。