2016-07-27 50 views
0

假设我有如下的数据:取N个值从每个分区在火花

val DataSort = Seq(("a",5),("b",13),("b",2),("b",1),("c",4),("a",1),("b",15),("c",3),("c",1)) 
val DataSortRDD = sc.parallelize(DataSort,2) 

现在有两个分区与:

scala>DataSortRDD.glom().take(2).head 
res53: Array[(String,Int)] = Array(("a",5),("b",13),("b",2),("b",1),("c",4)) 
scala>DataSortRDD.glom().take(2).tail 
res54: Array[(String,Int)] = Array(Array(("a",1),("b",15),("c",3),("c",2),("c",1))) 

假设在每一个分区中的数据已经使用类似sortWithinPartitions(col("src").desc,col("rank").desc)(这是一个数据帧,但只是为了说明)排序。

我想从每个分区获得每个字母的前两个值(如果有超过2个值)。因此,在这个例子中,结果在每个分区应该是:

scala>HypotheticalRDD.glom().take(2).head 
Array(("a",5),("b",13),("b",2),("c",4)) 
scala>HypotheticalRDD.glom().take(2).tail 
Array(Array(("a",1),("b",15),("c",3),("c",2))) 

我知道,我必须使用mapPartition功能,但它在我心中并不清楚知道怎样才能在每个分区中的值进行迭代,并获得第一2.任何提示?

编辑:更确切地说,我知道在每个分区中,数据已经先按'字母'排序,然后按'count'排序。所以我的主要想法是mapPartition中的输入函数应该遍历分区,并且yield是每个字母的前两个值。这可以通过检查每个迭代值来完成。这就是我可以在Python写:

def limit_on_sorted(iterator): 
    oldKey = None 
    cnt = 0 
    while True: 
     elem = iterator.next() 
     if not elem: 
      return 
     curKey = elem[0] 
     if curKey == oldKey: 
      cnt +=1 
      if cnt >= 2: 
       yield None 
     else: 
      oldKey = curKey 
      cnt = 0 
     yield elem 

DataSortRDDpython.mapPartitions(limit_on_sorted,preservesPartitioning=True).filter(lambda x:x!=None) 
+0

不要紧,最终的结果是怎么_partitioned_?换句话说 - 如果你得到了相同的结果,但分区不同,那还是可以的吗?如预期的那样,过滤仍将基于原始分区。 –

回答

1

假设你真的不关心结果的分区,你可以使用mapPartitionsWithIndex纳入分区ID成关键您groupBy,那么你可以很容易地把前两项为每个这样的关键:

val result: RDD[(String, Int)] = DataSortRDD 
    .mapPartitionsWithIndex { 
    // add the partition ID into the "key" of every record: 
    case (partitionId, itr) => itr.map { case (k, v) => ((k, partitionId), v) } 
    } 
    .groupByKey() // groups by letter and partition id 
    // take only first two records, and drop partition id 
    .flatMap { case ((k, _), itr) => itr.take(2).toArray.map((k, _)) } 

println(result.collect().toList) 
// prints: 
// List((a,5), (b,15), (b,13), (b,2), (a,1), (c,4), (c,3)) 

请注意,最终的结果(groupByKey改变了分区),我是,假设这对你想要做的事情(坦率地说,逃脱了我)并不关键。

编辑:如果你想避免洗牌和每个分区中的所有操作:

val result: RDD[(String, Int)] = DataSortRDD 
    .mapPartitions(_.toList.groupBy(_._1).mapValues(_.take(2)).values.flatten.iterator, true) 
+0

感谢您的回答。也许我应该在问题中提及它。我想使用'mapPartition'的原因是因为我想避免出于效率原因在分区之间进行混洗。在你使用'groupByKey'的解决方案中,有洗牌。 –

+0

我明白了。编辑我的答案,包括一个没有洗牌的解决方案(保留分区) –

+0

您的回答是正确的。我关心的是'groupBy(_._ 1)'。为什么当我知道这些值已经按字母和按数字排序后需要分组?我已经更新了我的问题以更清晰地表明我的想法。 –