2017-05-03 51 views
2

我有表,看起来像这样:如何计算匹配相关条件的行?

TripID | Name | State 
    1  | John | OH  
    2  | John | OH 
    3  | John | CA 
    4  | John | OH 
    1  | Mike | CA 
    2  | Mike | CA 
    3  | Mike | OH 

我想指望谁前往OH民为先,然后CA.

在上述情况下它会是约翰只是所以答案应该是1

所以我想知道我们如何可以设置SQL过滤一定的顺序来筛选结果呢?

+1

如果您没有指定排序的列,那么首先是任意的。 –

+0

嘿@vkp你能在这里更具体吗? –

回答

2

我可能误解也许你的问题,但如果你问:

有多少人前往OH,然后再以CA.

(的草图)查询可能如下:

scala> trips.show 
+------+----+-----+ 
|tripid|name|state| 
+------+----+-----+ 
|  1|John| OH| 
|  2|John| OH| 
|  3|John| CA| 
|  4|John| OH| 
|  1|Mike| CA| 
|  2|Mike| CA| 
|  3|Mike| OH| 
+------+----+-----+ 

scala> trips.orderBy("name", "tripid").groupBy("name").agg(collect_list("state")).show 
+----+-------------------+ 
|name|collect_list(state)| 
+----+-------------------+ 
|John| [OH, OH, CA, OH]| 
|Mike|  [CA, CA, OH]| 
+----+-------------------+ 

正如现在我明白了,你有两个选择:

  1. (硬)写用户定义的聚合函数(UDAF)将执行聚合(并将用包含不同状态的行程代替collect_list)。

  2. (更简单)编写一个用户定义的函数(UDF),它可以执行与上面的UDAF类似的工作(但在collect_list已收集值之后)。

  3. (容易)使用functions(如explode和/或window

让我们做最简单的办法(不一定是最有效的!)。

事实证明,groupBy以前并不是真的必要(!)您可以单独使用窗口聚合(使用两次)来处理它。

import org.apache.spark.sql.expressions.Window 
val byName = Window.partitionBy("name").orderBy("tripid") 

val distinctStates = trips.withColumn("rank", rank over byName).dropDuplicates("name", "state").orderBy("name", "rank") 

scala> distinctStates.show 
+------+----+-----+----+ 
|tripid|name|state|rank| 
+------+----+-----+----+ 
|  1|John| OH| 1| 
|  3|John| CA| 3| 
|  1|Mike| CA| 1| 
|  3|Mike| OH| 3| 
+------+----+-----+----+ 

// rank again but this time use the pre-calculated distinctStates dataset 
val distinctStatesRanked = distinctStates.withColumn("rank", rank over byName).orderBy("name", "rank") 

scala> distinctStatesRanked.show 
+------+----+-----+----+ 
|tripid|name|state|rank| 
+------+----+-----+----+ 
|  1|John| OH| 1| 
|  3|John| CA| 2| 
|  1|Mike| CA| 1| 
|  3|Mike| OH| 2| 
+------+----+-----+----+ 

val left = distinctStatesRanked.filter($"state" === "OH").filter($"rank" === 1) 
val right = distinctStatesRanked.filter($"state" === "CA").filter($"rank" === 2) 
scala> left.join(right, "name").show 
+----+------+-----+----+------+-----+----+ 
|name|tripid|state|rank|tripid|state|rank| 
+----+------+-----+----+------+-----+----+ 
|John|  1| OH| 1|  3| CA| 2| 
+----+------+-----+----+------+-----+----+