2017-10-09 272 views
0

我需要对RDD进行排序。排序需要在我的记录的多个领域,因此我需要一个自定义比较器。使用spark进行排序

我看到sortBy,因为它只接受一个键。我碰巧http://codingjunkie.net/spark-secondary-sort/,因此使用repartitionAndSortWithinPartitions实现相同。

为什么sortBy接受自定义比较器并进行排序?为什么我必须重新分配才能使用自定义比较器?

回答

0

问题1:这是方法sortBy签名

/** 
    * Return this RDD sorted by the given key function. 
    */ 
    def sortBy[K](
     f: (T) => K, 
     ascending: Boolean = true, 
     numPartitions: Int = this.partitions.length) 
     (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope { 
    this.keyBy[K](f) 
     .sortByKey(ascending, numPartitions) 
     .values 
    } 

你RDD数据对象是T型的,很明显

请注意,sortBy方法绝对有单个关键参数字段:f: (T) => K

它接受匿名函数,因此您可以轻松地生成自定义的可比较结构,并最大限度地使用具有自己明确定义的比较器的常见数据类型。

例如,如果您的RDD [INT,INT],我们称之为数据,你可以做的是如下:

val cmp = (t: (Int, Int)) => (t._1, -t._2) 
data.sortBy(cmp) 

这可以实现多领域比较容易,对不对?

这将得到一个排序的RDD,第一场上升和第二场 下降。

问题2:rep​​artitionAndSortWithinPartitions 使用

这是一个特定的操作者RDD旨在更有效的比调用重新分配,然后在每个分区内排序。

您的程序在排序之前不需要预先重新分区,它只是针对此高性能特定常见模式进行内部优化。

详情请参考document

+0

我使用Java,我不认为我可以写一个相当于'val cmp =(t:(Int,Int))=>(t._1,-t._2)'的lambda表达式。 Java中的'sortBy'允许我只拾取一个用于排序的密钥。请参考http://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaRDD.html#sortBy-org.apache.spark.api.java.function 。功能 - 布尔内部 - –

0
  • mapPartitions使用每个分区进行排序,例如,.sorted
  • repartitionAndSortWithinPartitions有效 排序分区的同时重新分区。
  • sortBy做出全局排序 RDD
  • RDD的sortByKey方法是使用总订货

  • RDD的repartitionAndSortWithinPartitions使用分区中的排序,但不能跨分区,但不幸的是它增加额外的步骤来做重新分区

作为写入的火花API中,repartitionAndSortWithinPartitions比调用重新分配,然后在每个分区内排序更高效换句话说repartitionAndSortWithinPartitions由密钥将第一重新分区基于所提供的分区中的数据,并且然后进行排序:

所以首先重新分区,然后调用sortBy给你不错的表现 同样可以实现使用repartitionAndSortWithinPartitions

添加一些排序的例子希望这将有助于。

例1

val rdd = sc.parallelize(Seq(
    |    ("math", 55), 
    |    ("math", 56), 
    |    ("english", 57), 
    |    ("english", 58), 
    |    ("science", 59), 
    |    ("science", 54))) 

rdd.collect() 

//Default Sorting : Ascending order 
val sorted1 = rdd.sortByKey() 

sorted1.collect() 

//Custom Sorting : Descending order (using implicit 'Ordering') 
{ 
    | //Let us define an implicit sorting for the method sortByKey() 
    | //We have used '{' above to limit the scope of the implicit ordering 
    | implicit val sortIntegersByString = new Ordering[String] { 
    |  override def compare(a: String, b: String) = { 
    |   val result = a.compare(b) 
    |   //We use -ve to sort the key in descending order 
    |   -result 
    |  } 
    | } 
    | val sorted2 = rdd.sortByKey() 
    | 
    | //Result 
    | sorted2.collect() 
    | } 

//Default Sorting : Descending order (done using the 'ascending' flag argument) 
val sorted3 = rdd.sortByKey(false) 

//Result 
sorted3.collect() 

结果:

rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[101] at parallelize at command-1784487111427703:1 
sorted1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[104] at sortByKey at command-1784487111427703:12 
sorted3: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[110] at sortByKey at command-1784487111427703:34 
res28: Array[(String, Int)] = Array((science,59), (science,54), (math,55), (math,56), (english,57), (english,58)) 

例2

case class Row(var firstName: String, var lastName: String, var city: String) 

var rows = List(new Row("Oscar", "Wilde", "London"), 
       new Row("Otto", "Swift", "Berlin"), 
       new Row("Carl", "Swift", "Paris"), 
       new Row("Hans", "Swift", "Dublin"), 
       new Row("Hugo", "Swift", "Sligo")) 

//print ("sort by last name") 
//rows.sortBy(_.lastName) 


print ("sort by last name and first name") 

rows.sortBy(r => (r.lastName, r.firstName)) 



sort by last name and first namedefined class Row 
rows: List[Row] = List(Row(Oscar,Wilde,London), Row(Otto,Swift,Berlin), Row(Carl,Swift,Paris), Row(Hans,Swift,Dublin), Row(Hugo,Swift,Sligo)) 
res26: List[Row] = List(Row(Carl,Swift,Paris), Row(Hans,Swift,Dublin), Row(Hugo,Swift,Sligo), Row(Otto,Swift,Berlin), Row(Oscar,Wilde,London)) 

RDD VS数据集:

val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
case class MyRecord(time: Double, id: String) 
val rdd = sc.parallelize(1 to 200, 200).flatMap(x =>Seq.fill(10000)(MyRecord(util.Random.nextDouble, "xxx"))) 
// sort this RDD by time: 
val sorted = rdd.sortBy(x => x.time) 
result.count 

// convert the original RDD to Dataframe and sort again: 
val df = sqlContext.createDataFrame(rdd) 
df.registerTempTable("data") 
val result = sqlContext.sql("select * from data order by time") 
result.count