2017-07-17 74 views
1

我正在研究将学生数据转换为间隔的小型项目。该程序只是读取数据,并从标记列中选择标记(整数),将其按升序排序后转换为间隔。任何一个可以帮我这个特殊部分,与许多感谢:如何从Scala中的列读取行

代码:

import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.Row 


case class Rating(mark: Int, classes: String, schooles: String, name: String) 

val Result = sc.textFile("hdfs://schools:9000/input/marks.csv").map(_.split(",")).map(p => Rating(p(0).toInt, p(1).trim, p(2).trim, p(3).trim)).toDF 
val all_marks = Result.groupBy("classes", "schooles","name").agg(collect_list("mark") as "marks",count("*") as "cnt").where($"cnt" > 10) 

val mrk=all_marks.select("marks") 

我需要帮助的部分:

mrk.foreach(
    var ascending=mrk.sort 
    var interval=ascending[0]+"-"+ascending[ascending.size] 
) 

我怎样才能读取的标记按行行,所以我可以对它们进行排序,并将它们转换为间隔。

回答

2

您可以创建一个用户自定义函数,从你的列表中创建一个新的领域作为间隔

下面是一个简单的例子,你已经计算列标记

import org.apache.spark.sql.functions._ 
val ddf1 = Seq(List(2,3,1), List(6,4,3)).toDF("marks") 

val testUdf = udf((list: Seq[Int]) => { 
    val ascending = list.sorted //sorts in ascending order 
    s"${ascending(0)} - ${ascending(ascending.size - 1)}" 
}) 

ddf1.withColumn("marks", testUdf($"marks")) 

输出:

+-----+ 
|marks| 
+-----+ 
|1 - 3| 
|3 - 6| 
+-----+ 

希望这有助于!

+0

非常感谢Shankar,但是如何将数据帧“val mrk = all_marks.select(”marks“)”转换为Seq和“val ddf1 = Seq(List(2,3,1),List(6,4, 3 ))。toDF(“marks”)“ –

+0

你为什么要转换为列表?我认为这不是好主意 –

+0

如果你真的想要mrk.rdd.map(r => r(0))。collect()这是你可以做的 –

1

它可以使用旁边的方式来获得这样的结果 - 数据帧转换为RDD与列表类型,适用的地图功能和转换RDD回数据帧:

mrk.rdd.map(_.getList[Int](0).toList).map(l => s"${l.min} - ${l.max}").toDF("marks") 

注:getList[Int]返回Java的utils.List键入并将其转换为Scala的列表,我们必须使用toList方法并导入scala.collection.JavaConversions._

也可以使用DataSet的API,而不是RDD:

mrk.map(_.getList[Int](0).toList).map(l => s"${l.min} - ${l.max}").toDF("marks") 
1

我需要使用一个WrappedArray得到UDF的工作,像这样:

case class Rating(mark: Int, classes: String, schooles: String, name: String) 

val Result = sc.parallelize(Seq(
    Rating(56, "classA", "SchoolA", "English"), 
    Rating(57, "classB", "SchoolA", "English"), 
    Rating(58, "classA", "SchoolA", "English"), 
    Rating(59, "classB", "SchoolA", "English"), 
    Rating(60, "classA", "SchoolA", "English"), 
    Rating(61, "classA", "SchoolA", "English"))).toDF() 


val toInterval = udf((marks: scala.collection.mutable.WrappedArray[Int]) => s"${marks.min}-${marks.max}") 

val all_marks = Result.groupBy("classes", "schooles","name").agg(collect_list("mark") as "marks",count("*") as "cnt") 

all_marks.select("marks").withColumn("interval", toInterval(col("marks"))).show() 

输出:

+----------------+--------+ 
|   marks|interval| 
+----------------+--------+ 
|[56, 58, 60, 61]| 56-61| 
|  [57, 59]| 57-59| 
+----------------+--------+