2017-02-24 63 views
1

我有两个RDD(K,V),在火花它不允许两个映射嵌套。检查如果一个RDD(K,V)V是包含在另一个R dd(K,V)V

val x = sc.parallelize(List((1,"abc"),(2,"efg"))) 
val y = sc.parallelize(List((1,"ab"),(2,"ef"), (3,"tag")) 

如果RDD很大,我想检查“abc”是否包含“ab”。

+0

您是否可以使用所需的输出更新您正在查找的问题。 –

+0

谢谢,我想知道“abc”是否包含“ab”,输出如(abc,efg) – ozil

回答

0

假设你想从RDD X时,它的子选择一个值出现在RDDŸ那么这段代码应该工作。

def main(args: Array[String]): Unit = { 
    val x = spark.sparkContext.parallelize(List((1, "abc"), (2, "efg"))) 
    val y = spark.sparkContext.parallelize(List((1, "ab"), (2, "ef"), (3, "tag"))) 

    // This RDD is filtered. That is we are selecting elements from x only if the substring of the value is present in 
    // the RDD y. 
    val filteredRDD = filterRDD(x, y) 
    // Now we map the filteredRDD to our result list 
    val resultArray = filteredRDD.map(x => x._2).collect() 
} 

def filterRDD(x: RDD[(Int, String)], y: RDD[(Int, String)]): RDD[(Int, String)] = { 
    // Broadcasting the y RDD to all spark nodes, since we are collecting this before hand. 
    // The reason we are collecting the y RDD is to avoid call collect in the filter logic 
    val y_bc = spark.sparkContext.broadcast(y.collect.toSet) 
    x.filter(m => { 
    y_bc.value.exists(n => m._2.contains(n._2)) 
    }) 
} 
+1

谢谢,数据量小时使用广播。 – ozil

相关问题