2017-06-06 73 views
0

我有一个RDD,我想获得当前位置前的平均值(包括当前位置)在RDD 例如:如何在火花/斯卡拉的RDD当前位置前获得平均值

inputRDD: 
1, 2, 3, 4, 5, 6, 7, 8 

output: 
1, 1.5, 2, 2.5, 3, 3.5, 4, 4.5 

这是我的尝试:

val rdd=sc.parallelize(List(1,2,3,4,5,6,7,8),4) 
    var sum=0.0 
    var index=0.0 
    val partition=rdd.getNumPartitions 
    rdd.zipWithIndex().collect().foreach(println) 
    rdd.zipWithIndex().sortBy(x=>{x._2},true,1).mapPartitions(ite=>{ 
     var result=new ArrayBuffer[Tuple2[Double,Long]]() 
     while (ite.hasNext){ 
     val iteNext=ite.next() 
     sum+=iteNext._1 
     index+=1 
     var avg:Double=sum/index 
     result.append((avg,iteNext._2)) 
     } 
     result.toIterator 
    }).sortBy(x=>{x._2},true,partition).map(x=>{x._1}).collect().foreach(println) 

我必须repartition到1然后用数组计算的话,它是如此低效。

是否有任何更清洁的解决方案,而不使用阵列在4个分区?

回答

0

对不起,我不使用Scala和希望你能读懂它

df = spark.createDataFrame(map(lambda x: (x,), range(1, 9)), ['val']) 
df = df.withColumn('spec_avg', 
        f.avg('val').over(Window().orderBy('val').rowsBetween(start=Window.unboundedPreceding, end=0))) 
+0

,因为我的输出类型是RDD,何时我将日期框架转换为RDD(df.rdd),分区更改为1,当我将日期框架转换为RDD时,是否有任何方法可以使分区不发生更改 – mentongwu

0

更简单的解决方案是使用Spark-SQL。 我在这里计算的平均运行的每一行

val df = sc.parallelize(List(1,2,3,4,5,6,7,8)).toDF("col1") 

df.createOrReplaceTempView("table1") 

val result = spark.sql("""SELECT col1, sum(col1) over(order by col1 asc)/row_number() over(order by col1 asc) as avg FROM table1""") 

,或者如果你想使用DataFrames API交替。

import org.apache.spark.sql.expressions._ 
val result = df 
.withColumn("csum", sum($"col1").over(Window.orderBy($"col1"))) 
.withColumn("rownum", row_number().over(Window.orderBy($"col1"))) 
.withColumn("avg", $"csum"/$"rownum") 
.select("col1","avg") 

输出

result.show() 

+----+---+ 
|col1|avg| 
+----+---+ 
| 1|1.0| 
| 2|1.5| 
| 3|2.0| 
| 4|2.5| 
| 5|3.0| 
| 6|3.5| 
| 7|4.0| 
| 8|4.5| 
+----+---+ 
+0

因为我输出的类型是RDD,当我转换dateframe到RDD( df.rdd),分区变为1,有什么办法让分区不会改变当我将日期帧转换为RDD – mentongwu