2015-07-22 129 views
2

后,我映射我RDD到如何使用reduceByKey将值添加到Scala Spark中的Set中?

((_id_1, section_id_1), (_id_1, section_id_2), (_id_2, section_3), (_id_2, section_4)) 

我想reduceByKey

((_id_1, Set(section_id_1, section_id_2), (_id_2, Set(section_3, section_4))) 
val collectionReduce = collection_filtered.map(item => { 
     val extras = item._2.get("extras") 
     var section_id = "" 
     var extras_id = "" 
     if (extras != null) { 
     val extras_parse = extras.asInstanceOf[BSONObject] 
     section_id = extras_parse.get("guid").toString 
     extras_id = extras_parse.get("id").toString 
     } 
     (extras_id, Set {section_id}) 
    }).groupByKey().collect() 

我的输出

((_id_1, (Set(section_1), Set(section_2))), (_id_2, (Set(section_3), Set(section_4)))) 

我该如何解决呢?

在此先感谢。

回答

3

通过简单地使用++来组合列表,您可以使用reduceByKey

val rdd = sc.parallelize((1, Set("A")) :: (2, Set("B")) :: (2, Set("C")) :: Nil) 
val reducedRdd = rdd.reduceByKey(_ ++ _) 
reducedRdd.collect() 
// Array((1,Set(A)), (2,Set(B, C))) 

在你的情况:

collection_filtered.map(item => { 
    // ... 
    (extras_id, Set(section_id)) 
}).reduceByKey(_ ++ _).collect() 
2

这里是groupByKey/mapValues

val rdd = sc.parallelize(List(("_id_1", "section_id_1"), ("_id_1", "section_id_2"), ("_id_2", "section_3"), ("_id_2", "section_4"))) 

rdd.groupByKey().mapValues(v => v.toSet).foreach(println) 

这里使用combineByKey另一替代(推荐超过groupByKey)的替代:

rdd.combineByKey(
     (value: String) => Set(value), 
     (x: Set[String], value: String) => x + value , 
     (x: Set[String], y:  Set[String]) => (x ++ y) 
    ).foreach(println) 
+0

上面使用reduceByKey和你的方式有什么不同?哪一个更好?谢谢。 – giaosudau

+2

'groupByKey'在这里更简单,因为您不会聚合(“* reduce *”)具有相同键的两个键值对之间的任何信息:只需将这些值连接在一起。使用reduce的实现将做同样的工作,只是稍微难以阅读版本。 – huitseeker

+2

您应该比'groupByKey'更优先使用'reduceByKey',看看这个[Spark gitbook](http://databricks.gitbooks.io/databricks-spark-knowledge-base/content/best_practices/prefer_reducebykey_over_groupbykey.html)说明。 –

相关问题