2016-06-14 75 views
4

我正在使用Spark Dataset(Spark 1.6.1版本)。 下面是我的代码如何使用spark组数据集

object App { 

val conf = new SparkConf() 
.setMaster("local") 
.setAppName("SparkETL") 

val sc = new SparkContext(conf) 
sc.setLogLevel("ERROR") 
val sqlContext = new SQLContext(sc); 
import sqlContext.implicits._ 

} 

override def readDataTable(tableName:String):DataFrame={ 
val dataFrame= App.sqlContext.read.jdbc(JDBC_URL, tableName, JDBC_PROP); 
return dataFrame; 
} 


case class Student(stud_id , sname , saddress) 
case class Student(classid, stud_id, name) 


var tbl_student = JobSqlDAO.readDataTable("tbl_student").filter("stud_id = '" + studId + "'").as[Student].as("tbl_student") 

var tbl_class_student = JobSqlDAO.readDataTable("tbl_class_student").as[StudentClass].as("tbl_class_student") 


var result = tbl_class_student.joinWith(tbl_student, $"tbl_student.stud_id" === $"tbl_class_student.stud_id").as("ff") 

现在我想BY子句对多个列执行组? 如何做到这一点? result.groupBy(_._1._1.created_at)这样我可以做到吗? 如果是的话,那么我不能看到作为一个组的结果也是如何在多列上做到这一点?

回答

0

如果我已经正确理解了您的要求,那么您最好的选择是在PairRDDFunctions类中使用reduceByKey函数。

函数的签名是​​,它只是表示您使用一系列键/值对。

让我解释一下工作流程:

  1. 你找回你MANT与之合作的集(在你的代码:result
  2. 随着RDD map功能拆分的结果包含两个元组集(例如:result.map(row => ((row.key1, row.key2), (row.value1, row.value2))
  3. 现在你有一个RDD [(K,V)],其中类型K是键字段元组的类型,V是类型值字段元组
  4. 您可以直接使用reduceByKey通过传递(V,V) => V类型的函数,聚合值(例如:(agg: (Int, Int), val: (Int, Int)) => (agg._1 + val._1, agg._2 + val._2)

请注意:

  • 你必须从聚合函数返回相同的值类型
  • 您必须导入org.apache.spark.SparkContext._才能自动使用PairRDDFunctions实用功能
  • groupBy相同的推理,您必须从第e启动RDD到一对RDD[K,V],但是您没有聚合函数,因为您只是将值存储在seq中以用于进一步计算
  • 如果您需要聚合的起始值(例如:用于计数),请改用foldByKey功能