2017-07-31 85 views
1

我有一个的大数据帧有222列我想要做像下面的例子认沽值从一行到另一阶dataaframe

|id  |day   |col1 |col2 | col3 .................... 
+----------+----------------+-------+-----+ 
|  329|    0| null|2.0 
|  329|    42| null|null 
|  329|    72| 5.55|null 
|  329|    106| null|null 
|  329|    135| null|3.0 
|  329|    168| null|4.0 
|  329|    189| 4.995|null 
|  329|    212| null|6.0 
|  329|    247| null|null 
|  329|    274| null|8.0 



|id  |  day  |col1 |col2 |....................... 
+----------+----------------+-------+-----+ 
|  329|    0| null|2.0 
|  329|    42| null|2.0 
|  329|    72| 5.55|2.0 
|  329|    106| 5.55|2.0 
|  329|    135| 5.55|3.0 
|  329|    168| 5.55|4.0 
|  329|    189| 4.995|4.0 
|  329|    212| 4.995|6.0 
|  329|    247| 4.995|6.0 
|  329|    274| 4.995|8.0 
. 
. 
. 
. 
. 

1.read行1 2.我有85K的唯一ID的和每个ID有10个结果(只有一个ID的示出的示例)3.如果 在第2行的数据不存在,则把它从ID的前一行

我得到导致这样

id   | day   |original_col1 |Result_col1|prevValue| 
+----------+----------------+--------------+-----------+---------+ 
|  329|    0| null  | null |  null| 
|  329|    42| null  | null |  null| 
|  329|    72| 5.55  | 5.55 |  null| 
|  329|    106| null  | 5.55 |  5.55| 
|  329|    135| null  | null |  null| 
|  329|    168| null  | null |  null| 
|  329|    189| 4.995  | 4.995 |  null| 
|  329|    212| null  | 4.995 | 4.995| 
|  330|.................................................... 
|  330|..................................................... 
     . 
+0

是否有确定性的方式来排序数据,以便能够使用窗口函数(滞后)?我知道你希望在由col“id”定义的分区中应用上面的逻辑,但是除非你有办法定义一些排序(假定顺序很重要),对于id为“1”的分区,对于“col1”你可能会在第1/2/3行得到空值,结果会有所不同。如果数据没有排序,您可以尝试使用monotonically_increasing_id()函数在从文件/源读取数据后立即生成order_id。 – Traian

+0

我忘了添加一列请现在检查,ID是唯一的,强制性ID不过是用户而且每个ID都少于6条记录。 –

回答

3

你不能做到这一点与现有的窗函数(例如落后)。您需要使用类似的概念进行分区和排序,但需要使用定制逻辑来滚动非空值。

case class MyRec(id: Integer, day: Integer, col1: Option[Double], col2: Option[Double]) 

defined class MyRec 

scala> :paste 
// Entering paste mode (ctrl-D to finish) 

val ds = Seq(
    MyRec(329, 0, None, Some(2.0)), 
    MyRec(329, 42, None, None), 
    MyRec(329, 72, Some(5.55), None), 
    MyRec(329, 106, None, None), 
    MyRec(329, 135, None, Some(3.0)), 
    MyRec(329, 168, None, Some(4.0)), 
    MyRec(329, 189, Some(4.995), None), 
    MyRec(329, 212, None, Some(6.0)), 
    MyRec(329, 247, None, None), 
    MyRec(329, 274, None, Some(8.0)) 
).toDS() 

ds.printSchema() 
ds.show(false) 

val updated_ds = ds.repartition('id).sortWithinPartitions('id, 'day) 
    .mapPartitions(iter => { 
    var crtId: Integer = null 
    var prevId: Integer = null 
    var rollingVals = collection.mutable.Map[String, Option[Double]]() 
    for (rec <- iter) yield { 
     crtId = rec.id 

     // 1st record for new id 
     if (prevId == null || crtId != prevId) { 
     rollingVals = collection.mutable.Map[String, Option[Double]]() 
     prevId = crtId 
     } 

     rollingVals("col1") = if (rec.col1.isDefined) rec.col1 else rollingVals.getOrElse("col1", None) 
     rollingVals("col2") = if (rec.col2.isDefined) rec.col2 else rollingVals.getOrElse("col2", None) 
     MyRec(rec.id, rec.day, rollingVals("col1"), rollingVals("col2")) 
    } 
    }) 

updated_ds.printSchema() 
updated_ds.show(false) 

// Exiting paste mode, now interpreting. 

root 
|-- id: integer (nullable = true) 
|-- day: integer (nullable = true) 
|-- col1: double (nullable = true) 
|-- col2: double (nullable = true) 

+---+---+-----+----+ 
|id |day|col1 |col2| 
+---+---+-----+----+ 
|329|0 |null |2.0 | 
|329|42 |null |null| 
|329|72 |5.55 |null| 
|329|106|null |null| 
|329|135|null |3.0 | 
|329|168|null |4.0 | 
|329|189|4.995|null| 
|329|212|null |6.0 | 
|329|247|null |null| 
|329|274|null |8.0 | 
+---+---+-----+----+ 

root 
|-- id: integer (nullable = true) 
|-- day: integer (nullable = true) 
|-- col1: double (nullable = true) 
|-- col2: double (nullable = true) 

+---+---+-----+----+ 
|id |day|col1 |col2| 
+---+---+-----+----+ 
|329|0 |null |2.0 | 
|329|42 |null |2.0 | 
|329|72 |5.55 |2.0 | 
|329|106|5.55 |2.0 | 
|329|135|5.55 |3.0 | 
|329|168|5.55 |4.0 | 
|329|189|4.995|4.0 | 
|329|212|4.995|6.0 | 
|329|247|4.995|6.0 | 
|329|274|4.995|8.0 | 
+---+---+-----+----+ 

ds: org.apache.spark.sql.Dataset[MyRec] = [id: int, day: int ... 2 more fields] 
updated_ds: org.apache.spark.sql.Dataset[MyRec] = [id: int, day: int ... 2 more fields] 
+0

它的工作谢谢你 –

1

使用窗函数,然后案例时:

val df2 = df 
    .withColumn("prevValue", lag('col1, 1).over(Window.partitionBy('id).orderBy('day))) 
    .withColumn("col1", when('col1.isNull, 'prevValue).otherwise('col1)) 

进口也spark.implicits._

+0

我是新来的,所以请理解它可能很容易错误:重载的方法值滞后与替代品: (电子邮件:org.apache.spark.sql.Column,偏移量:Int,defaultValue:任何)org.apache.spark.sql .COLUMN (COLUMNNAME:字符串,偏移量:中等,默认值:任意)org.apache.spark.sql.Column (COLUMNNAME:字符串,偏移量:智力)org.apache.spark.sql.Column (例如:org.apache.spark.sql.Column,offset:Int)org.apache.spark.sql.Column 不能应用于(String) –

+0

@RahulNirdhar对不起,我忘了一个参数。现在它应该工作:) –

+0

谢谢,但它不工作,因为我想,它给出了错误的结果,请参阅结果的问题 –