2016-06-09 42 views
0

我需要使用Scala API在Spark中编写一个场景。 我将一个用户定义的函数传递给一个数据框,它逐个处理数据框的每一行并返回元组(Row,Row)。我如何将RDD(行,行)更改为Dataframe(行)?见下面的代码示例 -Spark - 如何将地图功能输出(行,行)元组转换为一个Dataframe

**Calling map function-** 
    val df_temp = df_outPut.map { x => AddUDF.add(x,date1,date2)} 
**UDF definition.** 
    def add(x: Row,dates: String*): (Row,Row) = { 
...................... 
........................ 
    var result1,result2:Row = Row() 
.......... 
    return (result1,result2) 

现在df_temp是一个RDD(Row1,Row2)。我的要求是通过将元组元素分解为1个RDD或Dataframe的记录来使其成为一个RDD或Dataframe RDD(行)。感谢你的帮助。

+0

如何将两行元素组合起来?第二个列应该附加到第一个列?可能在两行中都存在共同的列?没有这些信息,问题就不清楚了。 –

回答

2

您可以使用flatMap扁平化你的元组行,说如果我们从这个例子RDD开始:

rddExample.collect() 
// res37: Array[(org.apache.spark.sql.Row, org.apache.spark.sql.Row)] = Array(([1,2],[3,4]), ([2,1],[4,2])) 

val flatRdd = rddExample.flatMap{ case (x, y) => List(x, y) } 
// flatRdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[45] at flatMap at <console>:35 

要将其转换成数据帧。

import org.apache.spark.sql.types.{StructType, StructField, IntegerType} 

val schema = StructType(StructField("x", IntegerType, true):: 
         StructField("y", IntegerType, true)::Nil)  
val df = sqlContext.createDataFrame(flatRdd, schema) 
df.show 
+---+---+ 
| x| y| 
+---+---+ 
| 1| 2| 
| 3| 4| 
| 2| 1| 
| 4| 2| 
+---+---+ 
+0

像魅力一样工作。万分感谢 :) –

相关问题