2016-07-29 60 views
0

给定RDD[(A, B)],其中AB之间存在多对多关系,如何将关系的交集分组?在Spark中合并相交多对多关系

即,如果可以通过一个或多个B s从一个A到另一个A绘制关系,则应该对它们进行分组。同样,B s可以通过A s进行分组。

例如,集合:

(1, 'a') 
(2, 'a') 
(2, 'b') 
(1, 'c') 
(3, 'f') 
(4, 'f') 
(5, 'g') 

应组成

([1,2], ['a','b','c']) 
([3,4], ['f']) 
([5], ['g']) 

我可以使用groupByKey获得

(1, ['a', 'c']) 
(2, ['a', 'b']) 
(3, ['f']) 
(4, ['f']) 
(5, ['g']) 

并且还

('a', [1, 2]) 
('b', [2]) 
('c', [1]) 
('f', [3,4]) 
('g', [5]) 

但我不知道在哪里把它从这里开始。

+0

RDD不支持这样的行动在箱子外面!我认为,第一步是正确的。在任何groupBy之后,您需要根据需要对列表进行折叠。 – rakesh

回答

0
object ManyToMany extends App { 
    val m = List((1, 'a'), 
    (2, 'a'), 
    (2, 'b'), 
    (1, 'c'), 
    (3, 'f'), 
    (4, 'f'), 
    (5, 'g')) 

    val mInt: Map[Int, Set[Char]] = m.groupBy(_._1).map { case (a, b) => a -> b.map { case (c, d) => d }.toSet } 
    val mChar: Map[Char, Set[Int]] = m.groupBy(_._2).map { case (a, b) => a -> b.map { case (c, d) => c }.toSet } 
    def isIntersect[A](as: List[Set[A]], bs: Set[A]): List[Set[A]] = as.filter { x => x.exists { y => bs.contains(y) } } 
    val c = m.map { case (a, b) => mInt(a) }.foldLeft(List.empty[Set[Char]]) { 
    case (sum, item) => 
     isIntersect(sum, item) match { 
     case Nil => item :: sum 
     case List(x) => 
      sum.filterNot(_ == x) ++ List(x ++ item) 
     } 
    } 
    val d = c.map(x => (x, x.map(mChar(_)).foldLeft(Set.empty[Int]) {  case (sum, i) => sum ++ i })) 
    println(d) 
} 
result: 
List((Set(g),Set(5)), (Set(a, c, b),Set(1, 2)), (Set(f),Set(3, 4)))