2016-06-13 130 views
1

如何在将take(5)调用到另一个RDD后将返回的集合转换为另一个RDD,以便我可以将输出文件中的前5个记录保存起来?Spark:scala - 如何将RDD的集合转换为另一个RDD

如果我使用saveAsTextfile它不让我使用takesaveAsTextFile在一起(这就是为什么你看到该行在下面注释)。它以排序顺序存储RDD中的所有记录,因此前5个记录是前5个国家,但我只想存储前5个记录 - 是否可以在RDD中转换集合[take(5)]?

val Strips = txtFileLines.map(_.split(",")) 
         .map(line => (line(0) + "," + (line(7).toInt + line(8).toInt))) 
         .sortBy(x => x.split(",")(1).trim().toInt, ascending=false) 
         .take(5) 
         //.saveAsTextFile("output\\country\\byStripsBar") 

解决方案: sc.parallelize(Strips, 1).saveAsTextFile("output\\country\\byStripsBar")

回答

2
val rowsArray: Array[Row] = rdd.take(5) 
val slicedRdd = sparkContext.parallelize(rowsArray, 1) 

slicedRdd.savesTextFile("specify path here") 
1

除非你绝对需要saveAsTextFile格式,我只想打印take(5)输出到使用简单的IO(如File)的文件。

否则,这里是罗嗦RDD唯一的解决办法:

scala> val rdd = sc.parallelize(5 to 1 by -1 map{x => (x, x*x)}) 
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[71] at parallelize at <console>:27 

scala> rdd.collect 
res1: Array[(Int, Int)] = Array((5,25), (4,16), (3,9), (2,4), (1,1)) 

scala> val top2 = rdd.sortBy(_._1).zipWithIndex.collect{case x if (x._2 < 2) => x._1} 
top2: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[79] at collect at <console>:29 

scala> top2.collect 
res2: Array[(Int, Int)] = Array((1,1), (2,4))