2016-03-21 33 views
0

我有一大堆的特点是note列在火花事件(用户点击/动作/按下按钮)的:Sessionization火花

>>> df.show(20) 
+-------+-------------+------------+------+ 
| user| timestamp|  note|action| 
+-------+-------------+------------+------+ 
|2376466|1458580817381|event #1 ...|UPDATE| 
|2376466|1458580822034|event #1 ...|UPDATE| 
|2376466|1458580822112|event #2 ...|UPDATE| 
|2376466|1458580822166|event #2 ...|UPDATE| 
|2376466|1458580822216|event #2 ...|UPDATE| 
|2376466|1458580822225|event #2 ...|UPDATE| 
|2376466|1458580822651|event #1 ...|UPDATE| 
|2376466|1458580822660|event #1 ...|UPDATE| 
+-------+-------------+------------+------+ 

我想知道一个“会话”的持续时间特别是note。例如,事件#2开始于1458580822112并结束于1458580822225,因此持续时间将是..225 - ..112 = 113毫秒。是否有任何火花助手或快捷方式将数据组织到“”会话“或其他方式来提取这样的信息?或者想法是不断地向每行添加额外的状态信息并在会话标识符列准备就绪时将其卷起?

注意:同一类型的多个音符应该被认为是单独的会话

+0

据我了解你的意图可能与窗口功能,但它不漂亮或特别有效。尽管如此,我已经发布了一些类似问题的答案。总的来说,实现像这样的RDDs(可能还包含数据集)可能更容易,但这是一个相当广泛的问题。 – zero323

+0

你介意在表面挖掘类似的问题吗? – Oleksiy

+0

如果我不忘记,我会在明天搜索,但它几乎是三个基本步骤:1)确定在“会话”之间切换(滞后)2)添加会话标识符(切换点的累计和)3)某些统计信息 – zero323

回答

2

你可以利用星火-SQL来acheive你的目标 下面是一些代码,对我的作品,其会给出会话 你可以编写一个辅助函数,然后将其注册为UDF 然后可以在你的SQL语句中调用这个UDF

df.registerTempTable("Events")  
import sqlContext.implicits._ 

# (You can modify this according to what exact value have in note column.) 

def process(colname: String):String = {  
    return colname.substring(0,8)  
} 

sqlContext.udf.register("process",process _)  
val dd = sqlContext.sql("select timestamp as timestamp, process(note) as note from Events") 

dd.registerTempTable("SubEvents") 

val dt = sqlContext.sql("select last(timestamp) - first(timestamp) as session, note as note from SubEvents group by note") 

dt.show()  
+--------+--------+  
|session| note|  
+--------+--------+  
|  5|event #1|  
|  2|event #2|  
|  1|event #3|  
+--------+--------+ 

而且完全bluemix火花笔记本可以看作here: -

感谢,

查尔斯。

+0

请格式化您的代码,这是不可读的! – eliasah

+0

这很有帮助,但它会将所有会话合并为一个,并确定所有事件的“最小”和“最大”时间。我想保持多个连续的事件为一个“会话” – Oleksiy

+0

所以你想按用户分组? –