2017-03-06 50 views
1

我一直在试图获得org.apache.spark.sql.explode的动态版本,但没有运气:我有一个数据集,其日期列名为event_date,另一列名为no_of_days_gap。我想使用no_of_days_gap使用explode函数创建行的克隆。我最初的尝试是使用这样的:如何使用列值爆炸?

myDataset.withColumn("clone", explode(array((0 until col("no_of_days_gap")).map(lit): _*))) 

然而,col("no_of_days_gap")Column型和Int预期。我也尝试了其他各种方法。那么我怎么才能做到这一点?

P.S .:我设法使用map功能,然后打电话flatMap,但我真的很想了解如何让withColumn的方法工作。

回答

0

有关下列哪些?

scala> val diddy = Seq(
    | ("2017/03/07", 4), 
    | ("2016/12/09", 2)).toDF("event_date", "no_of_days_gap") 
diddy: org.apache.spark.sql.DataFrame = [event_date: string, no_of_days_gap: int] 

scala> diddy.flatMap(r => Seq.fill(r.getInt(1))(r.getString(0))).show 
+----------+ 
|  value| 
+----------+ 
|2017/03/07| 
|2017/03/07| 
|2017/03/07| 
|2017/03/07| 
|2016/12/09| 
|2016/12/09| 
+----------+ 

// use explode instead 

scala> diddy.explode("no_of_days_gap", "events") { n: Int => 0 until n }.show 
warning: there was one deprecation warning; re-run with -deprecation for details 
+----------+--------------+------+ 
|event_date|no_of_days_gap|events| 
+----------+--------------+------+ 
|2017/03/07|    4|  0| 
|2017/03/07|    4|  1| 
|2017/03/07|    4|  2| 
|2017/03/07|    4|  3| 
|2016/12/09|    2|  0| 
|2016/12/09|    2|  1| 
+----------+--------------+------+ 

如果你坚持withColumn,那么......是......它! 扣紧!

diddy 
    .withColumn("concat", concat($"event_date", lit(","))) 
    .withColumn("repeat", expr("repeat(concat, no_of_days_gap)")) 
    .withColumn("split", split($"repeat", ",")) 
    .withColumn("explode", explode($"split")) 
0

你必须使用UDF:

val range = udf((i: Integer) => (0 until i).toSeq) 

df 
    .withColumn("clone", range($"no_of_days_gap")) // Add range 
    .withColumn("clone", explode($"clone")) // Explode 
+0

在火花壳这将返回一个错误: '''阶> VAL范围= UDF((ⅰ:整数)=>(0直到ⅰ).toSeq) scala.MatchError:阶。 collection.immutable.Range(类scala.reflect.internal.Types $ ClassNoArgsTypeRef) at org.apache.spark.sql.catalyst.ScalaReflection $ .schemaFor(ScalaReflection.scala:692) at org.apache.spark.sql .catalyst.ScalaReflection $ .schemaFor(ScalaReflection.scala:671) at org.apache.spark.sql.functions $ .udf(functions.scala:3072) ... 48 elided''' – Diddy