0

在运行Apache Spark作业时遇到的问题之一是将RDD中的每个元素相互相乘。 简单地说,我希望做一些类似的,将Spark RDD中的元素互相相加

enter image description here

目前,我这样做是使用2次迭代的每个“的foreach”。我的直觉是,这可以以高效的方式完成。

for (elementOutSide <- iteratorA) { 
    for (elementInside <- iteratorB) { 
    if (!elementOutSide.get(3).equals(elementInside.get(3))) { 
     val multemp = elementInside.getLong(3) * elementOutSide.getLong(3) 
     .... 
     ... 

}}} 

谁能帮我纠正和改善这种情况?提前致谢 .. !!

+0

我认为你正在寻找一个普通的笛卡尔连接。 – Alec

+1

顺便说一句,你的实现并不真正符合要求 - 它比较了实际的_elements_而不是它们的_indices_--,当且仅当原始RDD的记录是_unique_时,它才起作用。 –

+0

它们是唯一的,RDD是使用保证的sql查询构建的。 – Infamous

回答

1

正如评论指出的那样,这是一个笛卡尔连接。下面是它可以在一个RDD[(Int, String)],在这里我们感兴趣的是每两个不相同的Int S的乘法来完成:

val rdd: RDD[(Int, String)] = sc.parallelize(Seq(
    (1, "aa"), 
    (2, "ab"), 
    (3, "ac") 
)) 

// use "cartesian", then "collect" to map only relevant results 
val result: RDD[Int] = rdd.cartesian(rdd).collect { 
    case ((t1: Int, _), (t2: Int, _)) if t1 != t2 => t1 * t2 
} 

注:此实现假定输入记录是独一无二的,因为指示。如果它们不是,则可以在比较指数而不是数值的同时执行笛卡尔连接和rdd.zipWithIndex结果的映射。