2015-02-09 51 views
7

我已经实现了一个解决方案,通过密钥对RDD[K, V]组进行分组,并根据每个组(K, RDD[V])使用partitionByPartitioner计算数据。尽管如此,我不确定它是否真的有效率,我想要有你的观点。使用PartitionBy通过密钥拆分和有效计算RDD组

下面是一个示例情况:根据[K: Int, V: Int]列表,计算V什么恶意每组K,知道应当分布和V值可能是非常大的。这应该给:

List[K, V] => (K, mean(V)) 

简单的分区程序类:

class MyPartitioner(maxKey: Int) extends Partitioner { 

    def numPartitions = maxKey 

    def getPartition(key: Any): Int = key match { 
     case i: Int if i < maxKey => i 
    } 
    } 

分区代码:

val l = List((1, 1), (1, 8), (1, 30), (2, 4), (2, 5), (3, 7)) 

     val rdd = sc.parallelize(l) 
     val p = rdd.partitionBy(new MyPartitioner(4)).cache() 

     p.foreachPartition(x => { 
     try { 
      val r = sc.parallelize(x.toList) 
      val id = r.first() //get the K partition id 
      val v = r.map(x => x._2) 
      println(id._1 + "->" + mean(v)) 
     } catch { 
      case e: UnsupportedOperationException => 0 
     } 
     }) 

输出是:

​​

我的问题是:

  1. 拨打partitionBy时会发生什么? (对不起,我没有找到足够的规格)
  2. 通过分区映射是否真的有效率,知道在我的生产环境中它不会有太多的键(如样本50) 100万样本)
  3. paralellize(x.toList)的费用是多少?这是否一致? (我需要RDD输入mean()
  4. 你会怎样做自己?

问候

回答

4

您的代码不应该工作。您不能将SparkContext对象传递给执行者。 (这不是Serializable。)我也不明白你为什么需要。

要计算平均值,您需要计算总和和计数并取其比率。默认的分区将会很好。

def meanByKey(rdd: RDD[(Int, Int)]): RDD[(Int, Double)] = { 
    case class SumCount(sum: Double, count: Double) 
    val sumCounts = rdd.aggregateByKey(SumCount(0.0, 0.0))(
    (sc, v) => SumCount(sc.sum + v, sc.count + 1.0), 
    (sc1, sc2) => SumCount(sc1.sum + sc2.sum, sc1.count + sc2.count)) 
    sumCounts.map(sc => sc.sum/sc.count) 
} 

这是一种高效的单程计算,可以很好地推广。

+0

谢谢你的回答,当然它不能工作,我没有火花编码技巧的所有反射,我已经被我的本地jvm宠坏了。尽管如此,实际上我不需要计算平均值,但是需要一个复杂的ml方法,而且我需要一个RDD [Vector]。我怎么能从一个独特的RDD [Int,Int]中获得(key,RDD [Vector])列表?我没有找到解决方案。 – Seb 2015-02-10 09:52:06

+0

我认为这是一个类似的话题,然后:http://stackoverflow.com/questions/28166190/spark-column-wise-word-count/28199302#28199302我不知道你想如何使'矢量'从'Int's。但是,如果您想为每个密钥获取一个RDD,则需要拆分原始的RDD,并在链接的答案中对此进行了讨论。如果它没有给你答案,我建议提出另一个问题,或许是对你想要做的事情有一个清晰的,高层次的解释。 – 2015-02-10 12:36:19