2014-11-20 123 views
17

我需要为包含许多列的数据表生成row_numbers的完整列表。如何获取Spark RDD的SQL row_number等效项?

在SQL中,这应该是这样的:

select 
    key_value, 
    col1, 
    col2, 
    col3, 
    row_number() over (partition by key_value order by col1, col2 desc, col3) 
from 
    temp 
; 

现在,让我们在星火说,我有以下形式的RDD(K,V),其中V =(COL1,COL2,COL3)所以我的条目都喜欢

(key1, (1,2,3)) 
(key1, (1,4,7)) 
(key1, (2,2,3)) 
(key2, (5,5,5)) 
(key2, (5,5,9)) 
(key2, (7,5,5)) 
etc. 

我想用正确的ROW_NUMBER

(key1, (1,2,3), 2) 
(key1, (1,4,7), 1) 
(key1, (2,2,3), 3) 
(key2, (5,5,5), 1) 
(key2, (5,5,9), 2) 
(key2, (7,5,5), 3) 
etc. 
订购这些使用命令,如sortBy(),sortWith(),sortByKey(),zipWithIndex等,并有一个新的RDD

(我不在乎括号,所以表格也可以是(K,(col1,col2,col3,rownum)))

我该怎么做?

这是我第一次尝试:

val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3)) 

val temp1 = sc.parallelize(sample_data) 

temp1.collect().foreach(println) 

// ((3,4),5,5,5) 
// ((3,4),5,5,9) 
// ((3,4),7,5,5) 
// ((1,2),1,2,3) 
// ((1,2),1,4,7) 
// ((1,2),2,2,3) 

temp1.map(x => (x, 1)).sortByKey().zipWithIndex.collect().foreach(println) 

// ((((1,2),1,2,3),1),0) 
// ((((1,2),1,4,7),1),1) 
// ((((1,2),2,2,3),1),2) 
// ((((3,4),5,5,5),1),3) 
// ((((3,4),5,5,9),1),4) 
// ((((3,4),7,5,5),1),5) 

// note that this isn't ordering with a partition on key value K! 

val temp2 = temp1.??? 

还要注意的是,功能sortBy不能直接应用于RDD,但必须首先运行收集(),然后将输出不是RDD,无论是,但数组

temp1.collect().sortBy(a => a._2 -> -a._3 -> a._4).foreach(println) 

// ((1,2),1,4,7) 
// ((1,2),1,2,3) 
// ((1,2),2,2,3) 
// ((3,4),5,5,5) 
// ((3,4),5,5,9) 
// ((3,4),7,5,5) 

这里有一个小更多的进步,但仍然不分区:

val temp2 = sc.parallelize(temp1.map(a => (a._1,(a._2, a._3, a._4))).collect().sortBy(a => a._2._1 -> -a._2._2 -> a._2._3)).zipWithIndex.map(a => (a._1._1, a._1._2._1, a._1._2._2, a._1._2._3, a._2 + 1)) 

temp2.collect().foreach(println) 

// ((1,2),1,4,7,1) 
// ((1,2),1,2,3,2) 
// ((1,2),2,2,3,3) 
// ((3,4),5,5,5,4) 
// ((3,4),5,5,9,5) 
// ((3,4),7,5,5,6) 
+0

这个问题的其他几个部分回答问题的延伸,即http://stackoverflow.com/questions/23838614/how-to-sort-an-rdd-in-scala-spark,http://qnalist.com/questions/5086896/spark-sql-how-to-select-first-row-in-each-group -by-group,http://mail-archives.apache.org/mod_mbox/spark-user/201408.mbox/%3CD01B658B.2BF52%[email protected]%3E,http://stackoverflow.com/问题/ 270220 59/filter-rdd-based-on-row-number,http://stackoverflow.com/questions/24677180/how-do-i-select-a-range-of-elements-in-spark-rdd – 2014-11-20 22:03:13

+0

I'米也想回答这个问题。 [Hive添加了分析函数(包括0.11中的'row_number()')(https://issues.apache.org/jira/browse/HIVE-896),并且Spark 1.1支持HiveQL/Hive 0.12。所以看起来'sqlContext.hql(“select row_number()over(partition by ...')应该可以,但我得到一个错误。 – dnlbrky 2014-11-23 03:52:44

回答

13

row_number() over (partition by ... order by ...)功能已添加到Spark 1.4。这个答案使用PySpark/DataFrames。

创建测试数据框:

from pyspark.sql import Row, functions as F 

testDF = sc.parallelize(
    (Row(k="key1", v=(1,2,3)), 
    Row(k="key1", v=(1,4,7)), 
    Row(k="key1", v=(2,2,3)), 
    Row(k="key2", v=(5,5,5)), 
    Row(k="key2", v=(5,5,9)), 
    Row(k="key2", v=(7,5,5)) 
    ) 
).toDF() 

添加分区的行数:

from pyspark.sql.window import Window 

(testDF 
.select("k", "v", 
     F.rowNumber() 
     .over(Window 
       .partitionBy("k") 
       .orderBy("k") 
      ) 
     .alias("rowNum") 
     ) 
.show() 
) 

+----+-------+------+ 
| k|  v|rowNum| 
+----+-------+------+ 
|key1|[1,2,3]|  1| 
|key1|[1,4,7]|  2| 
|key1|[2,2,3]|  3| 
|key2|[5,5,5]|  1| 
|key2|[5,5,9]|  2| 
|key2|[7,5,5]|  3| 
+----+-------+------+ 
4

这是一个有趣的问题,你正在提出。我会用Python来回答它,但我相信你可以无缝地翻译到Scala。

这里是我会怎么对付它:

1-简化您的数据:现在

temp2 = temp1.map(lambda x: (x[0],(x[1],x[2],x[3]))) 

TEMP2是一个 “真正” 的键值对。这看起来:

[ 
((3, 4), (5, 5, 5)), 
((3, 4), (5, 5, 9)), 
((3, 4), (7, 5, 5)), 
((1, 2), (1, 2, 3)), 
((1, 2), (1, 4, 7)), 
((1, 2), (2, 2, 3)) 

]

2-然后,使用基团的按功能来再现的分区的效果BY:现在

temp3 = temp2.groupByKey() 

TEMP3是具有2 RDD行:

[((1, 2), <pyspark.resultiterable.ResultIterable object at 0x15e08d0>), 
((3, 4), <pyspark.resultiterable.ResultIterable object at 0x15e0290>)] 

3-现在,您需要为RDD的每个值应用排名函数。在Python中,我会使用简单的排序功能(枚举将创建ROW_NUMBER列):

temp4 = temp3.flatMap(lambda x: tuple([(x[0],(i[1],i[0])) for i in enumerate(sorted(x[1]))])).take(10) 

注意,要实现你的特定的顺序,你将需要养活右“键”的说法(在python,我只想创造一个lambda函数像:

lambda tuple : (tuple[0],-tuple[1],tuple[2]) 

末(没有密钥参数的功能,它看起来像):

[ 
((1, 2), ((1, 2, 3), 0)), 
((1, 2), ((1, 4, 7), 1)), 
((1, 2), ((2, 2, 3), 2)), 
((3, 4), ((5, 5, 5), 0)), 
((3, 4), ((5, 5, 9), 1)), 
((3, 4), ((7, 5, 5), 2)) 

]

希望有所帮助!

祝你好运。