2017-08-08 69 views
0

我有数据流传输到我的火花阶应用这种格式如何获取火花流数据框中列的滞后?

id mark1 mark2 mark3 time 
uuid1 100 200 300 Tue Aug 8 14:06:02 PDT 2017 
uuid1 100 200 300 Tue Aug 8 14:06:22 PDT 2017 
uuid2 150 250 350 Tue Aug 8 14:06:32 PDT 2017 
uuid2 150 250 350 Tue Aug 8 14:06:52 PDT 2017 
uuid2 150 250 350 Tue Aug 8 14:06:58 PDT 2017 

我把它读入列ID,MARK-1,MARK2,MARK3和时间。时间也转换为日期时间格式。 我想要得到这个由id分组,并获得mark1的滞后,它给出了前一行的mark1值。 事情是这样的:

id mark1 mark2 mark3 prev_mark time 
uuid1 100 200 300 null  Tue Aug 8 14:06:02 PDT 2017 
uuid1 100 200 300 100  Tue Aug 8 14:06:22 PDT 2017 
uuid2 150 250 350 null  Tue Aug 8 14:06:32 PDT 2017 
uuid2 150 250 350 150  Tue Aug 8 14:06:52 PDT 2017 
uuid2 150 250 350 150  Tue Aug 8 14:06:58 PDT 2017 

考虑数据帧是markDF。我曾尝试:

val window = Window.partitionBy("uuid").orderBy("timestamp") val newerDF = newDF.withColumn("prev_mark", lag("mark1", 1, null).over(window))

它说非时间窗不能流/数据集追加/帧应用。

我也曾尝试:

val window = Window.partitionBy("uuid").orderBy("timestamp").rowsBetween(-10, 10) val newerDF = newDF.withColumn("prev_mark", lag("mark1", 1, null).over(window))

为了得到一个窗口,它也不能工作几行。流式传输窗口是这样的: window("timestamp", "10 minutes") 不能用于发送延迟。我对如何做到这一点感到困惑。任何帮助都是极好的!!

+0

做每批或全部流数据“滞后”? –

回答

0

我劝你改变time列到String作为

+-----+-----+-----+-----+----------------------------+ 
|id |mark1|mark2|mark3|time      | 
+-----+-----+-----+-----+----------------------------+ 
|uuid1|100 |200 |300 |Tue Aug 8 14:06:02 PDT 2017| 
|uuid1|100 |200 |300 |Tue Aug 8 14:06:22 PDT 2017| 
|uuid2|150 |250 |350 |Tue Aug 8 14:06:32 PDT 2017| 
|uuid2|150 |250 |350 |Tue Aug 8 14:06:52 PDT 2017| 
|uuid2|150 |250 |350 |Tue Aug 8 14:06:58 PDT 2017| 
+-----+-----+-----+-----+----------------------------+ 

root 
|-- id: string (nullable = true) 
|-- mark1: integer (nullable = false) 
|-- mark2: integer (nullable = false) 
|-- mark3: integer (nullable = false) 
|-- time: string (nullable = true) 

之后执行以下操作应该工作

df.withColumn("prev_mark", lag("mark1", 1).over(Window.partitionBy("id").orderBy("time"))) 

,这将给你输出

+-----+-----+-----+-----+----------------------------+---------+ 
|id |mark1|mark2|mark3|time      |prev_mark| 
+-----+-----+-----+-----+----------------------------+---------+ 
|uuid1|100 |200 |300 |Tue Aug 8 14:06:02 PDT 2017|null  | 
|uuid1|100 |200 |300 |Tue Aug 8 14:06:22 PDT 2017|100  | 
|uuid2|150 |250 |350 |Tue Aug 8 14:06:32 PDT 2017|null  | 
|uuid2|150 |250 |350 |Tue Aug 8 14:06:52 PDT 2017|150  | 
|uuid2|150 |250 |350 |Tue Aug 8 14:06:58 PDT 2017|150  | 
+-----+-----+-----+-----+----------------------------+---------+ 
+0

这不会工作,因为我的数据帧是一个流式数据帧 – PiKaY

+0

你为什么这么认为? –

+0

over函数不是非基于时间的窗口 – PiKaY