2015-07-19 83 views
1

我有一个看起来像这样的星火数据帧(为了清楚起见简化时间戳和id列值):删除冗余列在Spark数据帧时间序列数据

| Timestamp | id |  status | 
-------------------------------- 
|   1 | 1 |  pending | 
|   2 | 2 |  pending | 
|   3 | 1 | in-progress | 
|   4 | 1 | in-progress | 
|   5 | 3 | in-progress | 
|   6 | 1 |  pending | 
|   7 | 4 |  closed | 
|   8 | 1 |  pending | 
|   9 | 1 | in-progress | 

这是一个时间序列状态事件。我想结束的只是代表状态更改的行。从这个意义上说,这个问题可以被看作是删除冗余行的一种方法 - 例如在时间4和8的条目 - 都为id = 1 - 应该被删除,因为它们不代表给定id的状态改变。

对于上述组行,这将给出(顺序并不重要):

| Timestamp | id |  status | 
-------------------------------- 
|   1 | 1 |  pending | 
|   2 | 2 |  pending | 
|   3 | 1 | in-progress | 
|   5 | 3 | in-progress | 
|   6 | 1 |  pending | 
|   7 | 4 |  closed | 
|   9 | 1 | in-progress | 

原计划是由ID和状态,以便通过时间戳进行分区,并挑选的第一行的每个分区 - 然而这会给

| Timestamp | id |  status | 
-------------------------------- 
|   1 | 1 |  pending | 
|   2 | 2 |  pending | 
|   3 | 1 | in-progress | 
|   5 | 3 | in-progress | 
|   7 | 4 |  closed | 

即它失去了重复的状态变化。

任何指针赞赏,我是新来的数据帧,可能会错过一两招。

回答

1

使用lag窗函数应该做的伎俩

case class Event(timestamp: Int, id: Int, status: String) 

val events = sqlContext.createDataFrame(sc.parallelize(
    Event(1, 1, "pending") :: Event(2, 2, "pending") :: 
    Event(3, 1, "in-progress") :: Event(4, 1, "in-progress") :: 
    Event(5, 3, "in-progress") :: Event(6, 1, "pending") :: 
    Event(7, 4, "closed") :: Event(8, 1, "pending") :: 
    Event(9, 1, "in-progress") :: Nil 
)) 

events.registerTempTable("events") 

val query = """SELECT timestamp, id, status FROM (
    SELECT timestamp, id, status, lag(status) OVER (
     PARTITION BY id ORDER BY timestamp 
    ) AS prev_status FROM events) tmp 
    WHERE prev_status IS NULL OR prev_status != status 
    ORDER BY timestamp, id""" 

sqlContext.sql(query).show 

内部查询

SELECT timestamp, id, status, lag(status) OVER (
    PARTITION BY id ORDER BY timestamp 
) AS prev_status FROM events 

创建表如下,其中prev_statusstatus对于给定id以前的值,并通过timestamp有序。

+---------+--+-----------+-----------+ 
|timestamp|id|  status|prev_status| 
+---------+--+-----------+-----------+ 
|  1| 1| pending|  null| 
|  3| 1|in-progress| pending| 
|  4| 1|in-progress|in-progress| 
|  6| 1| pending|in-progress| 
|  8| 1| pending| pending| 
|  9| 1|in-progress| pending| 
|  2| 2| pending|  null| 
|  5| 3|in-progress|  null| 
|  7| 4|  closed|  null| 
+---------+--+-----------+-----------+ 

外部查询

SELECT timestamp, id, status FROM (...) 
WHERE prev_status IS NULL OR prev_status != status 
ORDER BY timestamp, id 

简单地过滤行,其中prev_statusNULL(第一行对于给定的id)或prev_statusstatus不同(有连续时间戳之间的状态变化)。添加订单只是为了使视觉检查更容易。

+0

不错,谢谢 - 滞后FTW! –