2015-02-24 182 views
1

我正在从Scro文件读取数据的Scala中的Spark工作。 开始是很简单:如何用Scala以优雅的方式处理Spark中的Avro

val path = "hdfs:///path/to/your/avro/folder" 
val avroRDD = sc.hadoopFile[AvroWrapper[GenericRecord], NullWritable, AvroInputFormat[GenericRecord]](path) 

但后来它不是优雅的,因为我需要的元组,即操作。

avroRDD.map(x => (x.get("value").asInstanceOf[Long],x.get("start_time").asInstanceOf[Long],x.get("level").asInstanceOf[Double],x.get("size").asInstanceOf[Long],x.get("category").asInstanceOf[String])). 
map(x => (asDate(x._2),(x._1,x._3,x._4,x._5))). 
reduceByKey((x,y) => (x._1+y._1,x._2+y._2,x._3+y._3,y._4)). 
map(x => List(x._1,x._2._1,x._2._2,x._2._3,x._2._4).mkString(",")) 
... 

我想使用地图,而不是元组,但如果我有几个不同的类型,即长和弦,它就会导致Map[String,Any]浇铸在每个操作。 即

avroRDD.map(x => Map("value" -> x.get("value").asInstanceOf[Long],"start_time" -> x.get("start_time").asInstanceOf[Long],"level" -> x.get("level").asInstanceOf[Double],"size" -> x.get("size").asInstanceOf[Long],"category" -> x.get("category").asInstanceOf[String])). 
map(x => (asDate(x.get("start_time).asInstanceOf[Long]),(x.get("value").asInstanceOf[Long],x.get("level").asInstanceOf[Double],x.get("size").asInstanceOf[Long],x.get("category").asInstanceOf[String]))). 
... 

替代解决方案是使用case类和包装值到它,但有时它会导致很多case类即定义:

case class TestClass(value: Long, level:Double, size:Long, category:String) 

avroRDD.map(x => (x.get("start_time").asInstanceOf[Long],TestClass(x.get("value").asInstanceOf[Long],x.get("level").asInstanceOf[Double],x.get("size").asInstanceOf[Long],x.get("category").asInstanceOf[String]))). 
map(x => (asDate(x._1),x._2)). 
reduceByKey((x,y) => (x.value+y.value,x.level+y.level,x.size+y.size,y.category)). 
map(x => List(x._1,x._2.value,x._2.level,x._2.size,x._2.category).mkString(",")) 
... 

我想知道是否有更好的办法在这种情况下处理通用记录 - 您不需要不断地转换为特定类型,并且可以对域名进行操作。像命名元组就可以完成这项工作。

你知道更好的方法吗?

你如何处理这种情况?

+0

正在使用特定的记录是否可以接受? – aaronman 2015-02-24 19:47:07

+0

不,我知道如果我将使用avro生成的类,我将有更简单的第一部分,但仍然选择值的子集仍然会是复杂的 - 后面的部分。 – baju 2015-02-24 19:48:41

+1

不知道如何使avro更好,但模式匹配可以清理深层嵌套的元组 – aaronman 2015-02-24 20:58:33

回答

2

随着模式匹配:

map { case (value, startTime, level, size, category) => 
    (asDate(startTime), (value,level,size,category)) 
}.reduceByKey { case ((value1, level1, size1, category1), (value2, level2, size2, category2)) => 
    (value1+value2, level1+level2, size1+size2, category2) 
}.map { case (startTime, (value, level, size, category)) => 
    List(startTime, value, level, size, category).mkString(",")) 
} 

如果你有一些元组其中获得经常重复使用,用例类他们。

+0

对于频繁重用,您也可以提取本地函数/方法,清理一点 – 2015-02-25 13:58:03