2016-03-30 104 views
-1

我首先说我是SQL新手,所以这个问题可能是微不足道的。 我有两个带时间戳键的表。 对于每个事件t_itable 1我希望所有的事件qtable 2这样的:SQL简单加入查询

q.timeStamp < t_i.timeStamp and q.timeStamp > t_{i-1}.timeStamp 

也就是说,如果事件按照这个顺序时间戳发生:

q1 
t1 q2 
    q3 
    q4 
t2 q5 
    q6 
t3 q7 

则结果查询应该是:

t1: q1 
t2: q2 q3 q4 
t3: q5 q6 

我正在使用Scala与SQL Spark和DataSet a nd DataFrame类,所以无论是纯函数式的“groupBy”还是SQL查询都不错。

+1

我们需要示例数据 - 向我们展示您的底层数据集的外观(以我们可以将粘贴复制到自己的外壳的方式)。否则,我们不知道如何正确地转换您的数据! –

+0

我没有报告数据,因为我将它包装在多个案例类中以便于操作。所以我会粘贴原始数据 – mastro

+0

@KatyaHandler我刚刚添加了数据的快照。在原始数据集中,DATE字段也会发生变化,应该在查询 – mastro

回答

1

首先,它不是一个真正的很“简单”查询......

首先 - 让我们创建dataframes一些样本数据 - 我创造了小case类,只有时间和一个字符串值,你可以替换它们更复杂的类:

case class A(time: Long, aValue: String) 
case class B(time: Long, bValue: String) 

val tableA = Seq(A(1, "q1"), A(2, "q2"), A(3, "q3"), A(4, "q4"), A(5, "q5"), A(6, "q6"), A(7, "q7")) 
val tableB = Seq(B(2, "t1"), B(5, "t2"), B(7, "t3")) 

val dfA: DataFrame = sqlContext.createDataFrame(tableA) 
val dfB: DataFrame = sqlContext.createDataFrame(tableB) 

现在 - 两个备选方案(其概念上相同):

  1. 使用SQL

    dfA.registerTempTable("a") 
    dfB.registerTempTable("b") 
    
    sqlContext.sql(
        """ 
        |SELECT collect_list(c.time), collect_list(c.aValue), first(b.time), first(b.bValue) 
        |FROM (
        | SELECT FIRST(a.time) as time, FIRST(a.aValue) as aValue, MIN(b.time) AS bTime 
        | FROM a 
        | JOIN b ON b.time > a.time 
        | GROUP BY a.time) AS c 
        |JOIN b ON c.bTime = b.time 
        |GROUP BY b.time 
        """.stripMargin).show() 
    

    这将打印,的B(时间和bValue)的时间列表和的值的列表中的每个值。

  2. 使用DataFrames

    import org.apache.spark.sql.functions._ 
    
    val aWithMinB: DataFrame = dfA 
        .join(dfB, dfA("time") < dfB("time")) 
        .groupBy(dfA("time")) 
        .agg(first(dfA("aValue")), min(dfB("time"))) 
        .withColumnRenamed("FIRST(aValue)", "aValue") 
        .withColumnRenamed("min(time)", "bTime") 
    
    aWithMinB 
        .join(dfB, dfB("time") === aWithMinB("bTime")) 
        .groupBy(dfB("time")) 
        .agg(collect_list(aWithMinB("time")), collect_list(aWithMinB("aValue")), first(dfB("time")), first(dfB("bValue"))) 
        .show() 
    

注意两个只会星火1.6.0或更高版本的工作,因为collect_list在早期版本中并不存在。

UPDATE:这里的流量的一些解释:

  • 的第一个查询(在SQL版本内查询)是指在表a应该建立一个“共同价值”的所有记录将分组为成结果中的单个记录
  • 这是什么常见的值?应分组的a中的值是b中两个连续记录之间的值。所以,他们共享最低b.time那就是大于那么他们的时间。换句话说 - 对于a中的每次X,我们寻找大于X的b中最小的最小时间。这将是相同的值a所有记录之间的两个连续b小号
  • 为了实现这个目标,我们同abb.time > a.time(获得的b多条记录为a每个记录),然后条件通过a.time(收缩的结果返回到a每条记录​​一个记录),取最低b.time为每个这样的记录,每一列a第一值(以第一组并不重要 - 所有分组的记录对于所有具有相同的值)
  • 现在我们对a中的每个记录都有这个“额外信息”,我们将其与b联系起来,并在time列中将其与该列分组。所有abTime相同的记录将被加入到相应的b记录中,我们完成了:我们再次对b的所有列使用first(同样,对于所有分组记录,所有值都相同,因为我们对b进行了分组, s的唯一标识符),并在a的列上使用collect_list将所有值作为列表获取。
+0

中考虑这个问题,但背后的原因尚不清楚,因为我之前没有SQL经验,您能向我解释吗? – mastro

+0

增加了详细的解释,希望这有助于。如果没有,对不起:(拿一个SQL教程来理解加入的逻辑意义,在哪里,分组...... –