2015-12-30 106 views
1

对不起,有一个新手问题。Spark:如何根据userId和时间戳创建sessionId

目前我有日志文件,其中包含诸如userId,event和timestamp等字段,同时缺少sessionId。我的目标是根据时间戳和预先定义的值TIMEOUT为每条记录创建一个sessionId。

如果超时值是10,和样本数据帧是:

scala> eventSequence.show(false) 

    +----------+------------+----------+ 
    |uerId  |event  |timestamp | 
    +----------+------------+----------+ 
    |U1  |A   |1   | 
    |U2  |B   |2   | 
    |U1  |C   |5   | 
    |U3  |A   |8   | 
    |U1  |D   |20  | 
    |U2  |B   |23  | 
    +----------+------------+----------+ 

的目标是:

+----------+------------+----------+----------+ 
    |uerId  |event  |timestamp |sessionId | 
    +----------+------------+----------+----------+ 
    |U1  |A   |1   |S1  | 
    |U2  |B   |2   |S2  | 
    |U1  |C   |5   |S1  | 
    |U3  |A   |8   |S3  | 
    |U1  |D   |20  |S4  | 
    |U2  |B   |23  |S5  | 
    +----------+------------+----------+----------+ 

我发现在R(Create a "sessionID" based on "userID" and differences in "timeStamp")一个解决方案,而我不能在Spark中找出它。

感谢您对此问题的任何建议。

+0

的可能的复制[如何在Spark数据帧添加一列?(http://stackoverflow.com/questions/ 32788322 /如何添加列火花数据框) –

+0

这不是一个重复的问题作为链接的问题。链接的问题显示了“如何在DataFrame中添加新列”,而我需要的是“如何在DataFrame中计算新列值(如sessionId) – Torrence

回答

0

的 “如何创建一个新的栏目” 肖恩的回答问候,而我的目标是“如何根据时间戳创建sessionId列”。经过几天的努力,Window函数在这种情况下被用作一个简单的解决方案。因为火花1.4

窗口被引入,它提供的功能,需要这样的操作时:

两者上的一组行的操作,同时仍然为每一输入行

在返回单个值为了创建基于时间戳的sessionId,首先我需要获得用户A的两个直接操作之间的区别。 windowDef定义的Window将由“userId”分区,并按时间戳排序,然后diff是一个列,它将为每行返回一个值,它的值将是分区(组)中当前行之后的1行,或者null如果当前行是最后一行在此分区

def handleDiff(timeOut: Int) = { 
    udf {(timeDiff: Int, timestamp: Int) => if(timeDiff > timeOut) timestamp + ";" else timestamp + ""} 
} 
val windowDef = Window.partitionBy("userId").orderBy("timestamp") 
val diff: Column = lead(eventSequence("timestamp"), 1).over(windowDef) 
val dfTSDiff = eventSequence. 
withColumn("time_diff", diff - eventSequence("timestamp")). 
withColumn("event_seq", handleDiff(TIME_OUT)(col("time_diff"), col("timestamp"))). 
groupBy("userId").agg(GroupConcat(col("event_seq")).alias("event_seqs")) 

更新: 然后利用窗口函数应用(在大熊猫提供)“cumsum”般的操作:

// Define a Window, partitioned by userId (partitionBy), ordered by timestamp (orderBy), and delivers all rows before current row in this partition as frame (rowsBetween) 
val windowSpec = Window.partitionBy("userId").orderBy("timestamp").rowsBetween(Long.MinValue, 0) 
val sessionDf = dfTSDiff. 
    withColumn("ts_diff_flag", genTSFlag(TIME_OUT)(col("time_diff"))). 
    select(col("userId"), col("eventSeq"), col("timestamp"), sum("ts_diff_flag").over(windowSpec).alias("sessionInteger")). 
    withColumn("sessionId", genSessionId(col("userId"), col("sessionInteger"))) 

此前: 然后按“;”分割。并获得每个会话,创建一个sessionId;之后按“,”分解并爆炸至最终结果。因此sessionId是在字符串操作的帮助下创建的。 (这部分应该用累计和运算代替,但是我没有找到一个好的解决方案)

欢迎任何关于这个问题的想法或想法。


GroupConcat可以在这里找到:SPARK SQL replacement for mysql GROUP_CONCAT aggregate function

参考:databricks introduction

-1

dt.withColumn( '的sessionId',expression for the new column sessionId
例如:
dt.timestamp +预先定义的值TIMEOUT