2017-06-19 182 views
-4

我JSON行,看起来像下面集结JSON对象,将字符串转换时间戳至今

[{"time":"2017-03-23T12:23:05","user":"randomUser","action":"sleeping"}] 
    [{"time":"2017-03-23T12:24:05","user":"randomUser","action":"sleeping"}] 
    [{"time":"2017-03-23T12:33:05","user":"randomUser","action":"sleeping"}] 
    [{"time":"2017-03-23T15:33:05","user":"randomUser2","action":"eating"}] 
    [{"time":"2017-03-23T15:33:06","user":"randomUser2","action":"eating"}] 

,所以我就2的问题,首先所有的时间被存储为字符串我DF里面,我相信它必须是我整理它们的日期?

其次,我需要以5分钟的时间间隔聚合这些数据,例如,2017-03-23T12:20:00至2017-03-23T12:24:59发生的所有事情都需要汇总并认为是2017-03-23T12:20:00时间戳

预期输出是

​​

感谢

回答

0

您可以将StringType列转换为使用铸造TimestampType列;然后,您可以将时间戳投入到IntegerType中,以使“舍入”降至最后5分钟的间隔,并且通过该组(以及所有其他列):

// importing SparkSession's implicits 
import spark.implicits._ 

// Use casting to convert String into Timestamp: 
val withTime = df.withColumn("time", $"time" cast TimestampType) 

// calculate the "most recent 5-minute-round time" and group by all 
val result = withTime.withColumn("time", $"time" cast IntegerType) 
    .withColumn("time", ($"time" - ($"time" mod 60 * 5)) cast TimestampType) 
    .groupBy("time", "user", "action").count() 

result.show(truncate = false) 
// +---------------------+-----------+--------+-----+ 
// |time     |user  |action |count| 
// +---------------------+-----------+--------+-----+ 
// |2017-03-23 12:20:00.0|randomUser |sleeping|2 | 
// |2017-03-23 15:30:00.0|randomUser2|eating |2 | 
// |2017-03-23 12:30:00.0|randomUser |sleeping|1 | 
// +---------------------+-----------+--------+-----+ 
相关问题