2017-03-07 85 views
1

我在Scala中的DataFrame中存在将值置换的问题。我最初的DataFrame看起来是这样的:Spark:将UDF应用于数据框根据DF中的值生成新列

+----+----+----+----+ 
|col1|col2|col3|col4| 
+----+----+----+----+ 
| A| X| 6|null| 
| B| Z|null| 5| 
| C| Y| 4|null| 
+----+----+----+----+ 

col1col2String类型和col3col4Int

而结果应该是这样的:

+----+----+----+----+------+------+------+ 
|col1|col2|col3|col4|AXcol3|BZcol4|CYcol4| 
+----+----+----+----+------+------+------+ 
| A| X| 6|null|  6| null| null| 
| B| Z|null| 5| null|  5| null| 
| C| Y| 4| 4| null| null|  4| 
+----+----+----+----+------+------+------+ 

这意味着三个新列后应col1col2并提取值的列被命名。提取的值来自列col2,col3col5,取决于哪个值不是null

那么如何实现呢?我首先想到的一个UDF这样的:

def myFunc (col1:String, col2:String, col3:Long, col4:Long) : (newColumn:String, rowValue:Long) = { 
    if col3 == null{ 
     val rowValue=col4; 
     val newColumn=col1+col2+"col4"; 
    } else{ 
     val rowValue=col3; 
     val newColumn=col1+col2+"col3"; 
    } 
    return (newColumn, rowValue); 
} 

val udfMyFunc = udf(myFunc _) //needed to treat it as partially applied function 

但我怎么能以正确的方式把它从数据帧?

当然,上面的所有代码都是垃圾,可能有更好的方法。因为我只是玩杂耍的第一个代码片段让我知道...比较Int值到null已不起作用。

任何帮助表示赞赏!谢谢!

+0

[Apache Spark - 将UDF的结果分配给多个数据帧列]可能的重复(http://stackoverflow.com/questions/35322764/apache-spark-assign-the-result-of-udf-to-多个数据帧列) – jwvh

回答

0

好的,我有一个解决方法来实现我想要的。我执行以下操作:

(1)I生成包含的元组的新列与[newColumnName,rowValue]以下这个建议Derive multiple columns from a single column in a Spark DataFrame

case class toTuple(newColumnName: String, rowValue: String) 

def createTuple (input1:String, input2:String) : toTuple = { 
    //do something fancy here 
    var column:String= input1 + input2 
    var value:String= input1   
    return toTuple(column, value) 
} 

val UdfCreateTuple = udf(createTuple _) 

(2)应用功能DataFrame

dfNew= df.select($"*", UdfCreateTuple($"col1",$"col2").alias("tmpCol") 

(3)创建具有不同值的数组newColumnName

val dfDistinct = dfNew.select($"tmpCol.newColumnName").distinct 

(4)创建具有不同值

var a = dfDistinct.select($"newCol").rdd.map(r => r(0).asInstanceOf[String]) 

var arrDistinct = a.map(a => a).collect() 

(5)创建密钥值映射

var seqMapping:Seq[(String,String)]=Seq() 
for (i <- arrDistinct){ 
    seqMapping :+= (i,i) 
} 

(6)应用映射原始数据帧,比照一个数组Mapping a value into a specific column based on annother column

val exprsDistinct = seqMapping.map { case (key, target) => 
    when($"tmpCol.newColumnName" === key, $"tmpCol.rowValue").alias(target) } 

val dfFinal = dfNew.select($"*" +: exprsDistinct: _*) 

嗯,这是有点麻烦,但我可以得到一组新的列不知道有多少,并在同一时间的价值转移到新的列。

评论赞赏!也许有更快的方法?

最佳,肯

+0

嗨,肯,我回答你的问题,迟到比我猜不到! – LucieCBurgess

1

我正好与我自己的数据框同样的问题,所以我想我会分享答案(即使它是你问:-)后10个月,我碰到你的问题,我想答案可能对其他人有帮助。有一个简单的方法:

val df3 = df2.withColumn("newCol", concat($"col1", $"col2")) //Step 1 
      .withColumn("value",when($"col3".isNotNull, $"col3").otherwise($"col4")) //Step 2 
      .groupBy($"col1",$"col2",$"col3",$"col4",$"newCol") //Step 3 
      .pivot("newCol") // Step 4 
      .agg(max($"value")) // Step 5 
      .orderBy($"newCol") // Step 6 
      .drop($"newCol") // Step 7 

     df3.show() 

步骤如下工作:

  1. 添加含有COL1与COL2
  2. //添加新列,“值链接的内容的新列“,其中包含col3或col4的非空内容
  3. GroupBy所需的列
  4. 在newCol上转动,其中包含现在为列标题的值
  5. 按值的最大值进行聚合,如果groupBy是每个组的单值,则该值将成为值本身;或可替代.agg(first($"value"))如果值恰好是一个字符串,而不是一个数值类型 - 最大功能只能通过NEWCOL应用于数字类型
  6. 为了使DF是按升序排列
  7. 降本栏目为你不再需要它,或者跳过这一步,如果你想要一列没有空值的数值

由于@ user8371915的帮助,我首先回答了我自己的关键问题。

结果如下:

+----+----+----+----+----+----+----+ 
|col1|col2|col3|col4| AX| BZ| CY| 
+----+----+----+----+----+----+----+ 
| A| X| 6|null| 6|null|null| 
| B| Z|null| 5|null| 5|null| 
| C| Y| 4| 4|null|null| 4| 
+----+----+----+----+----+----+----+ 

您可能需要玩的列标题字符串连接来获得正确的结果。

+1

@ user8371915,很好的编辑! :-) – LucieCBurgess

相关问题