2017-09-23 38 views
0

的特定领域的排名:火花产生于我已经有了一个<code>rdd</code>作为计算研究结果让我们说这是因为以下格式RDD

(uid, factor, name, avatar, gender, otherFactor1, otherFactor2) 

,现在我想的RDD被factor进行排序,并进行现场像rank指示记录的军衔,后来使用foreach每一个记录写入到数据库

我知道我被强权此:

rdd.sortBy{ 
    case (uid, factor, name, avatar, gender, otherFactor1, otherFactor2) => { 
     factor 
    } 
}.foreach{ 
    //how could I insert a rank field by the index of the loop? 
} 

在这里,我对如何通过foreach循环的指数

任何想法添加rank场卡住?

+2

'rdd.sortBy(_._ 2).zipWithIndex'? – philantrovert

+0

@philantrovert,回答下面的问题:) –

+0

@RameshMaharjan完成。 OP现在可以关闭该问题。 – philantrovert

回答

2

正如评论所说,你可以使用

rdd.sortBy(_._2).zipWithIndex 

你可以用它展平到一个更体面的结构:

rdd.sortBy(_._2).zipWithIndex.map { 
    case ((uid, factor, name, avatar, gender, otherFactor1, otherFactor2), rank) => 
    (uid, factor, name, avatar, gender, otherFactor1, otherFactor2, rank) 
} 

有一件事你可能要注意有关zipWithIndex,从the source code for RDD.scala

此方法需要在此RDD连续时触发点火作业超过一个分区。

如果你想避免这种情况,你可以使用zipWithUniqueId但我不认为它给每个元素的连续索引。

0

看看下面是否有帮助。

case class ItemInfo(item:String, quantity:Int) 
val data = sc.parallelize(List(("a",10),("b",20),("c",30))) 
val ItemDF = data.map(x=> ItemInfo(x._1,x._2)).toDF() 
ItemDF.registerTempTable("Item_tbl") 
val rankedItems = sqlContext.sql("select item, quantity, rank() over(order by quantity desc) as rank from Item_tbl") 
rankedItems.collect().foreach(println) 

本示例根据数量对项目进行排序。

+0

抱歉,还没有尝试使用spark sql的配置单元方式,'zipWithIndex'被@philantrovert评论会做的伎俩 – armnotstrong

相关问题