2016-08-10 41 views
1

利用窗口函数来过滤数据,我有以下数据:如何火花

rowid uid time code 
    1 1  5 a 
    2 1  6 b 
    3 1  7 c 
    4 2  8 a 
    5 2  9 c 
    6 2  9 c 
    7 2  10 c 
    8 2  11 a 
    9 2  12 c 

现在我想筛选这样的数据,我可以删除这些行6,7为特定UID我想在代码仅保存一行与值“C”

所以预期的数据应该是:

rowid uid time code 
    1 1  5 a 
    2 1  6 b 
    3 1  7 c 
    4 2  8 a 
    5 2  9 c 
    8 2  11 a 
    9 2  12 c 

我使用的窗函数是这样的:

val window = Window.partitionBy("uid").orderBy("time") 
val change = ((lag("code", 1).over(window) <=> "c")).cast("int") 

这将帮助我们识别代码为'c'的每一行。我可以延长这个过滤掉行,以获得预期的数据

+0

你能澄清你的要求吗?对于每个UID,您是否只想为每个代码保留一行,或者仅针对代码“c”,您需要执行此操作? – mattinbits

回答

1

如果你只想删除行其代码=“C”(除了第一个每个UID),你可以尝试以下方法:

val window = Window.partitionBy("uid", "code").orderBy("time") 
val result = df 
    .withColumn("rank", row_number().over(window)) 
    .where(
    (col("code") !== "c") || 
    col("rank") === 1 
) 
    .drop("rank") 

编辑基于新的信息:

val window = Window.partitionBy("uid").orderBy("time") 
val result = df 
    .withColumn("lagValue", coalesce(lag(col("code"), 1).over(window), lit(""))) 
    .where(
    (col("code") !== "c") || 
    (col("lagValue") !== "c") 
) 
    .drop("lagValue") 
+1

在使用上面的代码时,当我做partitionBy(“uid”,“code”)时,得到的数据集不正确,因为这会得出以下结果: rowid uid时间码 – hbabbar

+0

@hbabbar,它为什么不正确? –

+0

道歉,错过上传整个注释.. 所以得到的DF是一样的东西: 使用上面的代码,当我这样做 VAL窗口= Window.partitionBy(“UID”,“代码”)排序依据( “时间”) df.withColumn( “等级”,ROW_NUMBER()在(窗)) 所得数据集是不正确,因为这给出以下结果: rowid的UID时间码秩 4 2 8 a 2 2 1 6 b 1 3 1 7 c 1 5 2 9 c 1 因此,我在第e grouping on uid – hbabbar