首先,它不是一个真正的很“简单”查询......
首先 - 让我们创建dataframes一些样本数据 - 我创造了小case类,只有时间和一个字符串值,你可以替换它们更复杂的类:
case class A(time: Long, aValue: String)
case class B(time: Long, bValue: String)
val tableA = Seq(A(1, "q1"), A(2, "q2"), A(3, "q3"), A(4, "q4"), A(5, "q5"), A(6, "q6"), A(7, "q7"))
val tableB = Seq(B(2, "t1"), B(5, "t2"), B(7, "t3"))
val dfA: DataFrame = sqlContext.createDataFrame(tableA)
val dfB: DataFrame = sqlContext.createDataFrame(tableB)
现在 - 两个备选方案(其概念上相同):
使用SQL:
dfA.registerTempTable("a")
dfB.registerTempTable("b")
sqlContext.sql(
"""
|SELECT collect_list(c.time), collect_list(c.aValue), first(b.time), first(b.bValue)
|FROM (
| SELECT FIRST(a.time) as time, FIRST(a.aValue) as aValue, MIN(b.time) AS bTime
| FROM a
| JOIN b ON b.time > a.time
| GROUP BY a.time) AS c
|JOIN b ON c.bTime = b.time
|GROUP BY b.time
""".stripMargin).show()
这将打印,的B(时间和bValue)的时间列表和的值的列表中的每个值。
使用DataFrames:
import org.apache.spark.sql.functions._
val aWithMinB: DataFrame = dfA
.join(dfB, dfA("time") < dfB("time"))
.groupBy(dfA("time"))
.agg(first(dfA("aValue")), min(dfB("time")))
.withColumnRenamed("FIRST(aValue)", "aValue")
.withColumnRenamed("min(time)", "bTime")
aWithMinB
.join(dfB, dfB("time") === aWithMinB("bTime"))
.groupBy(dfB("time"))
.agg(collect_list(aWithMinB("time")), collect_list(aWithMinB("aValue")), first(dfB("time")), first(dfB("bValue")))
.show()
注意两个只会星火1.6.0或更高版本的工作,因为collect_list
在早期版本中并不存在。
UPDATE:这里的流量的一些解释:
- 的第一个查询(在SQL版本内查询)是指在表
a
应该建立一个“共同价值”的所有记录将分组为成结果中的单个记录
- 这是什么常见的值?应分组的
a
中的值是b
中两个连续记录之间的值。所以,他们共享最低值b.time
那就是大于那么他们的时间。换句话说 - 对于a
中的每次X,我们寻找大于X的b
中最小的最小时间。这将是相同的值在a
所有记录之间的两个连续b
小号
- 为了实现这个目标,我们同
a
与b
与b.time > a.time
(获得的b
多条记录为a
每个记录),然后条件通过a.time
(收缩的结果返回到a
每条记录一个记录),取最低b.time
为每个这样的记录,每一列a
的第一值(以第一组并不重要 - 所有分组的记录对于所有具有相同的值)
- 现在我们对
a
中的每个记录都有这个“额外信息”,我们将其与b
联系起来,并在time
列中将其与该列分组。所有a
与bTime
相同的记录将被加入到相应的b
记录中,我们完成了:我们再次对b
的所有列使用first
(同样,对于所有分组记录,所有值都相同,因为我们对b
进行了分组, s的唯一标识符),并在a
的列上使用collect_list
将所有值作为列表获取。
我们需要示例数据 - 向我们展示您的底层数据集的外观(以我们可以将粘贴复制到自己的外壳的方式)。否则,我们不知道如何正确地转换您的数据! –
我没有报告数据,因为我将它包装在多个案例类中以便于操作。所以我会粘贴原始数据 – mastro
@KatyaHandler我刚刚添加了数据的快照。在原始数据集中,DATE字段也会发生变化,应该在查询 – mastro