假设我有如下的数据:取N个值从每个分区在火花
val DataSort = Seq(("a",5),("b",13),("b",2),("b",1),("c",4),("a",1),("b",15),("c",3),("c",1))
val DataSortRDD = sc.parallelize(DataSort,2)
现在有两个分区与:
scala>DataSortRDD.glom().take(2).head
res53: Array[(String,Int)] = Array(("a",5),("b",13),("b",2),("b",1),("c",4))
scala>DataSortRDD.glom().take(2).tail
res54: Array[(String,Int)] = Array(Array(("a",1),("b",15),("c",3),("c",2),("c",1)))
假设在每一个分区中的数据已经使用类似sortWithinPartitions(col("src").desc,col("rank").desc)
(这是一个数据帧,但只是为了说明)排序。
我想从每个分区获得每个字母的前两个值(如果有超过2个值)。因此,在这个例子中,结果在每个分区应该是:
scala>HypotheticalRDD.glom().take(2).head
Array(("a",5),("b",13),("b",2),("c",4))
scala>HypotheticalRDD.glom().take(2).tail
Array(Array(("a",1),("b",15),("c",3),("c",2)))
我知道,我必须使用mapPartition
功能,但它在我心中并不清楚知道怎样才能在每个分区中的值进行迭代,并获得第一2.任何提示?
编辑:更确切地说,我知道在每个分区中,数据已经先按'字母'排序,然后按'count'排序。所以我的主要想法是mapPartition
中的输入函数应该遍历分区,并且yield
是每个字母的前两个值。这可以通过检查每个迭代值来完成。这就是我可以在Python写:
def limit_on_sorted(iterator):
oldKey = None
cnt = 0
while True:
elem = iterator.next()
if not elem:
return
curKey = elem[0]
if curKey == oldKey:
cnt +=1
if cnt >= 2:
yield None
else:
oldKey = curKey
cnt = 0
yield elem
DataSortRDDpython.mapPartitions(limit_on_sorted,preservesPartitioning=True).filter(lambda x:x!=None)
不要紧,最终的结果是怎么_partitioned_?换句话说 - 如果你得到了相同的结果,但分区不同,那还是可以的吗?如预期的那样,过滤仍将基于原始分区。 –