我可能误解也许你的问题,但如果你问:
有多少人前往OH,然后再以CA.
(的草图)查询可能如下:
scala> trips.show
+------+----+-----+
|tripid|name|state|
+------+----+-----+
| 1|John| OH|
| 2|John| OH|
| 3|John| CA|
| 4|John| OH|
| 1|Mike| CA|
| 2|Mike| CA|
| 3|Mike| OH|
+------+----+-----+
scala> trips.orderBy("name", "tripid").groupBy("name").agg(collect_list("state")).show
+----+-------------------+
|name|collect_list(state)|
+----+-------------------+
|John| [OH, OH, CA, OH]|
|Mike| [CA, CA, OH]|
+----+-------------------+
正如现在我明白了,你有两个选择:
(硬)写用户定义的聚合函数(UDAF)将执行聚合(并将用包含不同状态的行程代替collect_list
)。
(更简单)编写一个用户定义的函数(UDF),它可以执行与上面的UDAF类似的工作(但在collect_list
已收集值之后)。
(容易)使用functions(如explode
和/或window
)
让我们做最简单的办法(不一定是最有效的!)。
事实证明,groupBy
以前并不是真的必要(!)您可以单独使用窗口聚合(使用两次)来处理它。
import org.apache.spark.sql.expressions.Window
val byName = Window.partitionBy("name").orderBy("tripid")
val distinctStates = trips.withColumn("rank", rank over byName).dropDuplicates("name", "state").orderBy("name", "rank")
scala> distinctStates.show
+------+----+-----+----+
|tripid|name|state|rank|
+------+----+-----+----+
| 1|John| OH| 1|
| 3|John| CA| 3|
| 1|Mike| CA| 1|
| 3|Mike| OH| 3|
+------+----+-----+----+
// rank again but this time use the pre-calculated distinctStates dataset
val distinctStatesRanked = distinctStates.withColumn("rank", rank over byName).orderBy("name", "rank")
scala> distinctStatesRanked.show
+------+----+-----+----+
|tripid|name|state|rank|
+------+----+-----+----+
| 1|John| OH| 1|
| 3|John| CA| 2|
| 1|Mike| CA| 1|
| 3|Mike| OH| 2|
+------+----+-----+----+
val left = distinctStatesRanked.filter($"state" === "OH").filter($"rank" === 1)
val right = distinctStatesRanked.filter($"state" === "CA").filter($"rank" === 2)
scala> left.join(right, "name").show
+----+------+-----+----+------+-----+----+
|name|tripid|state|rank|tripid|state|rank|
+----+------+-----+----+------+-----+----+
|John| 1| OH| 1| 3| CA| 2|
+----+------+-----+----+------+-----+----+
如果您没有指定排序的列,那么首先是任意的。 –
嘿@vkp你能在这里更具体吗? –