2017-10-17 144 views
0

我想实现扁平化利用火花/斯卡拉API记录的逻辑拉平记录。我正在尝试使用地图功能。火花使用键列

能否请你帮我解决这个问题最简单的方法?

假设,对于一个给定钥匙我需要有3个过程代码

输入数据帧 - >

Keycol|processcode 
John |1 
Mary |8 
John |2 
John |4 
Mary |1 
Mary |7 

================= =============

输出数据帧 - >

Keycol|processcode1|processcode2|processcode3 
john |1   |2   |4 
Mary |8   |1   |7 

回答

1

假设相同的每Keycol行,一个方法是吨ø骨料processcode成每个Keycol阵列和扩展为三个独立的列:

val df = Seq(
    ("John", 1), 
    ("Mary", 8), 
    ("John", 2), 
    ("John", 4), 
    ("Mary", 1), 
    ("Mary", 7) 
).toDF("Keycol", "processcode") 

val df2 = df.groupBy("Keycol").agg(collect_list("processcode").as("processcode")) 

val numCols = df2.select(size(col("processcode"))).as[Int].first 
val cols = (0 to numCols - 1).map(i => col("processcode")(i)) 

df2.select(col("Keycol") +: cols: _*).show 

+------+--------------+--------------+--------------+ 
|Keycol|processcode[0]|processcode[1]|processcode[2]| 
+------+--------------+--------------+--------------+ 
| Mary|    8|    1|    7| 
| John|    1|    2|    4| 
+------+--------------+--------------+--------------+ 
+0

感谢狮子座以下行解决了我的problemval DF2 = df.groupBy( “KEYCOL”)。AGG(collect_list( “processcode”)。如( “processcode”))感谢您的快速帮助。 –

+1

很高兴它有帮助。你能通过接受答案来解决问题吗? –

1

一对夫妇的替代方法。

SQL

df.createOrReplaceTempView("tbl") 

val q = """ 
select keycol, 
     c[0] processcode1, 
     c[1] processcode2, 
     c[2] processcode3 
    from (select keycol, collect_list(processcode) c 
      from tbl 
     group by keycol) t0 
""" 

sql(q).show 

结果

scala> sql(q).show 
+------+------------+------------+------------+ 
|keycol|processcode1|processcode2|processcode3| 
+------+------------+------------+------------+ 
| Mary|   1|   7|   8| 
| John|   4|   1|   2| 
+------+------------+------------+------------+ 

PairRDDFunctions(groupByKey)+ mapPartitions

import org.apache.spark.sql.Row 
val my_rdd = df.map{ case Row(a1: String, a2: Int) => (a1, a2) 
        }.rdd.groupByKey().map(t => (t._1, t._2.toList)) 

def f(iter: Iterator[(String, List[Int])]) : Iterator[Row] = { 
    var res = List[Row](); 
    while (iter.hasNext) { 
    val (keycol: String, c: List[Int]) = iter.next  
    res = res ::: List(Row(keycol, c(0), c(1), c(2))) 
    } 
    res.iterator 
} 

import org.apache.spark.sql.types.{StringType, IntegerType, StructField, StructType} 
val schema = new StructType().add(
      StructField("Keycol", StringType, true)).add(
      StructField("processcode1", IntegerType, true)).add(
      StructField("processcode2", IntegerType, true)).add(
      StructField("processcode3", IntegerType, true)) 

spark.createDataFrame(my_rdd.mapPartitions(f, true), schema).show 

结果

scala> spark.createDataFrame(my_rdd.mapPartitions(f, true), schema).show 
+------+------------+------------+------------+ 
|Keycol|processcode1|processcode2|processcode3| 
+------+------------+------------+------------+ 
| Mary|   1|   7|   8| 
| John|   4|   1|   2| 
+------+------------+------------+------------+ 

请注意,在所有情况下,处理代码的列中值的顺序为未确定除非明确指定。