2017-03-06 56 views
7

考虑下面的数据框:如何最大化值并保留所有列(对于每个组的最大记录数)?

+----+-----+---+-----+ 
| uid| k| v|count| 
+----+-----+---+-----+ 
| a|pref1| b| 168| 
| a|pref3| h| 168| 
| a|pref3| t| 63| 
| a|pref3| k| 84| 
| a|pref1| e| 84| 
| a|pref2| z| 105| 
+----+-----+---+-----+ 

如何从uidk获得最大价值,但包括v

+----+-----+---+----------+ 
| uid| k| v|max(count)| 
+----+-----+---+----------+ 
| a|pref1| b|  168| 
| a|pref3| h|  168| 
| a|pref2| z|  105| 
+----+-----+---+----------+ 

我可以做这样的事情,但它会删除列“V”:

df.groupBy("uid", "k").max("count") 

回答

6

这是完美的窗口操作符的示例(使用over函数)或join

既然您已经想出了如何使用Windows,我专注于join

scala> val inventory = Seq(
    | ("a", "pref1", "b", 168), 
    | ("a", "pref3", "h", 168), 
    | ("a", "pref3", "t", 63)).toDF("uid", "k", "v", "count") 
inventory: org.apache.spark.sql.DataFrame = [uid: string, k: string ... 2 more fields] 

scala> val maxCount = inventory.groupBy("uid", "k").max("count") 
maxCount: org.apache.spark.sql.DataFrame = [uid: string, k: string ... 1 more field] 

scala> maxCount.show 
+---+-----+----------+ 
|uid| k|max(count)| 
+---+-----+----------+ 
| a|pref3|  168| 
| a|pref1|  168| 
+---+-----+----------+ 

scala> val maxCount = inventory.groupBy("uid", "k").agg(max("count") as "max") 
maxCount: org.apache.spark.sql.DataFrame = [uid: string, k: string ... 1 more field] 

scala> maxCount.show 
+---+-----+---+ 
|uid| k|max| 
+---+-----+---+ 
| a|pref3|168| 
| a|pref1|168| 
+---+-----+---+ 

scala> maxCount.join(inventory, Seq("uid", "k")).where($"max" === $"count").show 
+---+-----+---+---+-----+ 
|uid| k|max| v|count| 
+---+-----+---+---+-----+ 
| a|pref3|168| h| 168| 
| a|pref1|168| b| 168| 
+---+-----+---+---+-----+ 
4

您可以使用窗口函数:

from pyspark.sql.functions import max as max_ 
from pyspark.sql.window import Window 

w = Window.partitionBy("uid", "k") 

df.withColumn("max_count", max_("count").over(w)) 
+0

差不多,它添加了一个最大值的列,但它保留了所有的行。 – jfgosselin

2

这里是我想出了最好的解决方案到目前为止:

val w = Window.partitionBy("uid","k").orderBy(col("count").desc) 

df.withColumn("rank", dense_rank().over(w)).select("uid", "k","v","count").where("rank == 1").show