2016-02-24 33 views
0

所以我遇到了一个速度问题,我有一个数据集需要多次聚合。一次汇总多个值

最初我的团队已经建立了三个累加器,并对数据运行一个foreach循环。沿

val accum1:Accumulable[a] 
val accum2: Accumulable[b] 
val accum3: Accumulable[c] 

data.foreach{ 
     u => 
       accum1+=u 
       accum2 += u 
       accum3 += u 
} 

我想这些积累切换到聚集,这样我可以得到一个速度提升,并有机会获得蓄能器进行调试线的东西。我目前正试图找出一种方法来一次聚合这三种类型,因为运行3个独立的聚合速度要慢得多。有没有人有任何想法,我怎么能做到这一点?也许无法聚合,然后将模式匹配分解为两个RDD?

谢谢

+0

是否要立即执行任意集合集? – zero323

+0

@ zero323不,它是3个特定的聚合。只是其中一个与其他人有不同的类型。 –

回答

1

至于我可以告诉所有你需要在这里是aggregate与对应着被你的累加器执行的操作zeroValueseqOpcombOp

val zeroValue: (A, B, C) = ??? // (accum1.zero, accum2.zero, accum3.zero) 

def seqOp(r: (A, B, C), t: T): (A, B, C) = r match { 
    case (a, b, c) => { 
    // Apply operations equivalent to 
    // accum1.addAccumulator(a, t) 
    // accum2.addAccumulator(c, t)) 
    // accum3.addAccumulator(c, t) 
    // and return the first argument 
    // r 
    } 
} 

def combOp(r1: (A, B, C), r2: (A, B, C)): (A, B, C) = (r1, r2) match { 

    case ((a1, b1, c1), (a2, b2, c2)) => { 
    // Apply operations equivalent to 
    // acc1.addInPlace(a1, a2) 
    // acc2.addInPlace(b1, b2) 
    // acc3.addInPlace(c1, c2) 
    // and return the first argument 
    // r1 
    } 
} 

val rdd: RDD[T] = ??? 

val accums: (A, B, C) = rdd.aggregate(zeroValue)(seqOp, combOp) 
+0

嗨zero323。所以不幸的是,当我试图尝试这种方法时,遇到了一些非常令人讨厌的垃圾收集问题,这些问题在我单独执行每个值时没有看到。自从Scala将每个元组视为一个新对象以来,我有可能通过创建元组来处理严重的GC问题吗?每个值的类型都是可变的。地图 –

+0

如果你返回新的元组,那么每个元组都是一个新的对象。我的意思是它仍然会引用相同的地图,但是元组本身并不相同。你应该返回变异的参数。 'combOp'每个分区应用一次,所以它应该没关系,尽管你可以做同样的事情。 – zero323

+0

一般来说''聚合'(或一般的地图端聚合)可能在GC上很繁重(请参阅https://issues.apache.org/jira/browse/SPARK-772),尽管我不认为还有其他问题在这里返回新的元组。 – zero323