7
我已经实现了一个解决方案,通过密钥对RDD[K, V]
组进行分组,并根据每个组(K, RDD[V])
使用partitionBy
和Partitioner
计算数据。尽管如此,我不确定它是否真的有效率,我想要有你的观点。使用PartitionBy通过密钥拆分和有效计算RDD组
下面是一个示例情况:根据[K: Int, V: Int]
列表,计算V
什么恶意每组K
,知道应当分布和V
值可能是非常大的。这应该给:
List[K, V] => (K, mean(V))
简单的分区程序类:
class MyPartitioner(maxKey: Int) extends Partitioner {
def numPartitions = maxKey
def getPartition(key: Any): Int = key match {
case i: Int if i < maxKey => i
}
}
分区代码:
val l = List((1, 1), (1, 8), (1, 30), (2, 4), (2, 5), (3, 7))
val rdd = sc.parallelize(l)
val p = rdd.partitionBy(new MyPartitioner(4)).cache()
p.foreachPartition(x => {
try {
val r = sc.parallelize(x.toList)
val id = r.first() //get the K partition id
val v = r.map(x => x._2)
println(id._1 + "->" + mean(v))
} catch {
case e: UnsupportedOperationException => 0
}
})
输出是:
我的问题是:
- 拨打
partitionBy
时会发生什么? (对不起,我没有找到足够的规格) - 通过分区映射是否真的有效率,知道在我的生产环境中它不会有太多的键(如样本50) 100万样本)
paralellize(x.toList)
的费用是多少?这是否一致? (我需要RDD
输入mean()
)- 你会怎样做自己?
问候
谢谢你的回答,当然它不能工作,我没有火花编码技巧的所有反射,我已经被我的本地jvm宠坏了。尽管如此,实际上我不需要计算平均值,但是需要一个复杂的ml方法,而且我需要一个RDD [Vector]。我怎么能从一个独特的RDD [Int,Int]中获得(key,RDD [Vector])列表?我没有找到解决方案。 – Seb 2015-02-10 09:52:06
我认为这是一个类似的话题,然后:http://stackoverflow.com/questions/28166190/spark-column-wise-word-count/28199302#28199302我不知道你想如何使'矢量'从'Int's。但是,如果您想为每个密钥获取一个RDD,则需要拆分原始的RDD,并在链接的答案中对此进行了讨论。如果它没有给你答案,我建议提出另一个问题,或许是对你想要做的事情有一个清晰的,高层次的解释。 – 2015-02-10 12:36:19