2016-05-14 64 views
0

专栏中,我想与以前的日期值创建一个新的列ID的组(日期减去当前日期)为以下数据框使用星火窗函数导致创建数据帧

+---+----------+-----+ 
| id|  date|value| 
+---+----------+-----+ 
| a|2015-04-11| 300| 
| a|2015-04-12| 400| 
| a|2015-04-12| 200| 
| a|2015-04-12| 100| 
| a|2015-04-11| 700| 
| b|2015-04-02| 100| 
| b|2015-04-12| 100| 
| c|2015-04-12| 400| 
+---+----------+-----+ 

我已经尝试过导入窗口功能。

val df1=Seq(("a","2015-04-11",300),("a","2015-04-12",400),("a","2015-04-12",200),("a","2015-04-12",100),("a","2015-04-11",700),("b","2015-04-02",100),("b","2015-04-12",100),("c","2015-04-12",400)).toDF("id","date","value") 

var w1=Window.partitionBy("id").orderBy("date".desc) 
var leadc1=lead(df1("value"),1).over(w1) 
val df2=df1.withColumn("nvalue",leadc1) 

+---+----------+-----+------+             
| id|  date|value|nvalue| 
+---+----------+-----+------+ 
| a|2015-04-12| 400| 200| 
| a|2015-04-12| 200| 100| 
| a|2015-04-12| 100| 300| 
| a|2015-04-11| 300| 700| 
| a|2015-04-11| 700| null| 
| b|2015-04-12| 100| 100| 
| b|2015-04-02| 100| null| 
| c|2015-04-12| 400| null| 
+---+----------+-----+------+ 

但是,正如我们可以看到当我有ID相同的日期“为”我收到错误result.The结果应该是像

+---+----------+-----+------+             
| id|  date|value|nvalue| 
+---+----------+-----+------+ 
| a|2015-04-12| 400| 300| 
| a|2015-04-12| 200| 300| 
| a|2015-04-12| 100| 300| 
| a|2015-04-11| 300| null| 
| a|2015-04-11| 700| null| 
| b|2015-04-12| 100| 100| 
| b|2015-04-02| 100| null| 
| c|2015-04-12| 400| null| 
+---+----------+-----+------+ 

我已经有使用连接,虽然我是一个解决方案使用窗口函数寻找解决方案。

谢谢

回答

0

问题是你有多行具有相同的日期。 lead将取value从下一个在结果集中,而不是下一个日期。因此,当您按降序对日期进行排序时,下一行可能是同一日期。

如何识别用于特定日期的正确值?例如你为什么要从(id = a,date = 2015-04-11)取300,而不是700?

要做到这一点与窗口功能,你可能需要做多次通过 - 这将采取最后nvalue并将其应用于同一个id /日期分组中的所有行 - 但我不知道你的行最初是如何排序的。

val df1=Seq(("a","2015-04-11",300),("a","2015-04-12",400),("a","2015-04-12",200),("a","2015-04-12",100),("a","2015-04-11",700),("b","2015-04-02",100),("b","2015-04-12",100),("c","2015-04-12",400)).toDF("id","date","value") 

var w1 = Window.partitionBy("id").orderBy("date".desc) 
var leadc1 = lead(df1("value"),1).over(w1) 
val df2 = df1.withColumn("nvalue",leadc1) 
val w2 = Window.partitionBy("id", "date").orderBy("??? some way to distinguish row ordering") 
val df3 = df1.withColumn("nvalue2", last_value("nvalue").over(w2))