2016-11-12 41 views
0
rdd_1 = [(k1, (v1, v2)), (k2, (v3, v4, v5))] 
rdd_2 = [(v1, (w1)), (v3, (w2, w3)), (v5, (w4))] 

我想这样的一个新的RDD rdd = [(k1, (w1)), (k2, (w2, w3, w4))]如何Concat的两个火花

如何使用Python做到这一点火花RDD?

回答

1

flatMap,joingroupByKey应该完成这项工作(按此顺序使用)。

+0

你的意思是先得到[(v1,k1),(v2,k1),(v3,k2)..],然后做'join'?我无法弄清楚,你能解释一下更详细的内容吗?谢谢。 –

+0

当然,你可以先用不工作的代码更新问题吗?我可以帮助你找到你方法中的错误 – Mariusz

+0

好吧,让我先试试吧 –

0

以下是Scala中的完整工作代码。这段代码基本上使用了三个变换flatMap,join和groupBy。这里的问题是join和groupBy键必须不同。所以首先我们在rdd_1上使用flatMap来获得(v,k)类型的rdd。现在我们有类型(v,k)和(v,List(w))的rdds,所以我们根据v加入。我们加入的rdd的类型将为(v,(k,List(w)))。最后,我们通过将一个函数x => x._2._1 groupBy K转换为groupBy转换。下面是整个代码: - >

val rdd1 = sc.parallelize(Seq(("k1", List("v1", "v2")), ("k2", List("v3", "v4", "v5")))) 
val rdd2 = sc.parallelize(Seq(("v1", List("w1")), ("v3", List("w2", "w3")), ("v5", List("w4")))) 
val flattenedRdd1 = rdd1 flatMap { 
    case (x, y) => { 
    val lb = new ListBuffer[(String, String)] 
    y.foreach { v => lb += ((v, x)) } 
    lb 
    } 
} 
val joined = flattenedRdd1 join rdd2 
val result = joined.groupBy { x => x._2._1 }.map { 
    case (x, itr) => { 
    val lb = new ListBuffer[String] 
    itr.foreach { 
     case (f, s) => { 
     lb.++=(s._2) 
     } 
    } 
    (x, lb) 
    } 
} 
result.foreach(println(_)).