2017-06-13 88 views
0

我使用Window.sum函数来获取RDD中的值的总和,但是当我将DataFrame转换为RDD时,我发现结果只有一个分区。重新分区何时发生? ?当我在窗口中使用partitionBy时,为什么我用spark/scala得到不同的结果?

val rdd = sc.parallelize(List(1,3,2,4,5,6,7,8), 4) 
    val df = rdd.toDF("values"). 
     withColumn("csum", sum(col("values")).over(Window.orderBy("values"))) 
    df.show() 
    println(s"numPartitions ${df.rdd.getNumPartitions}") 
    // 1 
    //df is: 
// +------+----+ 
// |values|csum| 
// +------+----+ 
// |  1| 1| 
// |  2| 3| 
// |  3| 6| 
// |  4| 10| 
// |  5| 15| 
// |  6| 21| 
// |  7| 28| 
// |  8| 36| 
// +------+----+ 

我添加partitionBy在窗口,但结果是错误,我应该怎么做,这是我改变代码:

 val rdd=sc.parallelize(List(1,3,2,4,5,6,7,8),4) 
     val sqlContext = new SQLContext(m_sparkCtx) 
     import sqlContext.implicits._ 
     val df = rdd.toDF("values").withColumn("csum", sum(col("values")).over(Window.partitionBy("values").orderBy("values"))) 
     df.show() 
     println(s"numPartitions ${df.rdd.getNumPartitions}") 
     //1 
//df is: 
// +------+----+ 
// |values|csum| 
// +------+----+ 
// |  1| 1| 
// |  6| 6| 
// |  3| 3| 
// |  5| 5| 
// |  4| 4| 
// |  8| 8| 
// |  7| 7| 
// |  2| 2| 
// +------+----+ 
+0

我刚刚在你的其他问题中回答了这个问题。 :) –

回答

0

Window功能有partitionBy API用于分组的dataframeorderBy订购按升序或降序分组的行。

在您的第一个案例中,您尚未定义partitionBy,因此所有值都归入一个dataframe以进行排序,从而将数据混合到一个分区中。

但在第二种情况下,您自己在values上定义了partitionBy。因此,由于每个值都是不同的,因此每一行都被分组为单个组。

在第二种情况下的分区是200,因为这是当你还没有定义分区和洗牌发生

要从第二种情况下得到相同的结果与第一个情况下,应该spark定义的默认分区,您需要将您的dataframe与第一种情况一样分组到一个组中。为此,您需要创建另一个具有常数值的column,并将该值用于partitionBy

0

当创建列作为
withColumn("csum", sum(col("values")).over(Window.orderBy("values")))

因为还没有定义partitionBy()方法来定义分区的Window.orderBy("values")被排序在单个分区列“值”的值。

这是从初始4改变partition数为1

分区200是在第二个情况下,由于partitionBy()方法使用200默认分区。如果你需要的分区数为4,你可以使用像repartition(4)coalesce(4)

方法希望你明白了!

相关问题