我有一个RDD
,我想获得当前位置前的平均值(包括当前位置)在RDD
例如:如何在火花/斯卡拉的RDD当前位置前获得平均值
inputRDD:
1, 2, 3, 4, 5, 6, 7, 8
output:
1, 1.5, 2, 2.5, 3, 3.5, 4, 4.5
这是我的尝试:
val rdd=sc.parallelize(List(1,2,3,4,5,6,7,8),4)
var sum=0.0
var index=0.0
val partition=rdd.getNumPartitions
rdd.zipWithIndex().collect().foreach(println)
rdd.zipWithIndex().sortBy(x=>{x._2},true,1).mapPartitions(ite=>{
var result=new ArrayBuffer[Tuple2[Double,Long]]()
while (ite.hasNext){
val iteNext=ite.next()
sum+=iteNext._1
index+=1
var avg:Double=sum/index
result.append((avg,iteNext._2))
}
result.toIterator
}).sortBy(x=>{x._2},true,partition).map(x=>{x._1}).collect().foreach(println)
我必须repartition
到1然后用数组计算的话,它是如此低效。
是否有任何更清洁的解决方案,而不使用阵列在4个分区?
,因为我的输出类型是RDD,何时我将日期框架转换为RDD(df.rdd),分区更改为1,当我将日期框架转换为RDD时,是否有任何方法可以使分区不发生更改 – mentongwu