2016-09-19 56 views
0

我有以下代码:未能添加键映射并联

var res: GenMap[Point, GenSeq[Point]] = points.par.groupBy(point => findClosest(point, means)) 
    means.par.foreach(mean => if(!res.contains(mean)) { 
    println("Map doesn't contain mean: " + mean) 
    res += mean -> GenSeq.empty[Point] 
    println("Map contains?: " + res.contains(mean)) 
    }) 

使用这种情况下类:

case class Point(val x: Double, val y: Double, val z: Double) 

基本上,代码组中pointsPoint元件Point周围元素在means。算法本身虽然不是很重要。

我的问题是,我得到了以下的输出:

Map doesn't contain mean: (0.44, 0.59, 0.73) 
Map doesn't contain mean: (0.44, 0.59, 0.73) 
Map doesn't contain mean: (0.1, 0.11, 0.11) 
Map doesn't contain mean: (0.1, 0.11, 0.11) 
Map contains?: true 
Map contains?: true 
Map contains?: false 
Map contains?: true 

为什么我会永远得到这个?

Map contains?: false 

我正在检查密钥是否在res地图中。如果不是,那我就加入它。 那么它如何不能出现在地图中呢?

是否存在并行性问题?

+0

这个问题是否发生在没有使用并行化的情况下? – Samar

回答

2

你的代码中有一个符合竞争条件

res += mean -> GenSeq.empty[Point] 

多个线程reasigning资源同时使某些条目,可能会错过。

此代码解决了这个问题:

val closest = points.par.groupBy(point => findClosest(point, means)) 
val res = means.foldLeft(closest) { 
    case (map, mean) => 
    if(map.contains(mean)) 
     map 
    else 
     map + (mean -> GenSeq.empty[Point]) 
} 
+0

这工作,看起来优雅,但它不平行。 – octavian

+0

你是对的,第二部分不是并行执行。我认为计算量大的部分是findClosest()函数的执行。这是一个简单的解决方案,应该在并行性方面非常高效(尽管可以改进)。我认为,如果'意思'不是很大,那么它会像使用并行解决方案一样高效(如果不是更好),那么你必须认为在创建并行程序时总会有一些开销,有时它不值得并行化执行的某些部分 – Mikel

0

处理的点改变装置和结果是处理顺序敏感,所以该算法不适于本身并行执行。如果并行执行足够重要以允许改变算法,那么可以找到可以并行应用的算法。

使用一组已知的分组点,如格正方形中心的,意味着该点可以被分配给在平行其分组分和并联可以通过分组点分组:

import scala.annotation.tailrec 
import scala.collection.parallel.ParMap 
import scala.collection.{GenMap, GenSeq, Map} 
import scala.math._ 
import scala.util.Random 

class ParallelPoint { 
    val rng = new Random(0) 

    val groups: Map[Point, Point] = (for { 
       i <- 0 to 100 
       j <- 0 to 100 
       k <- 0 to 100 
       } 
       yield { 
       val p = Point(10.0 * i, 10.0 * j, 10.0 * k) 
       p -> p 
       } 
    ).toMap 

    val points: Array[Point] = (1 to 10000000).map(aaa => Point(rng.nextDouble() * 1000.0, rng.nextDouble() * 1000.0, rng.nextDouble() * 1000.0)).toArray 

    def findClosest(point: Point, groups: GenMap[Point, Point]): (Point, Point) = { 
    val x: Double = rint(point.x/10.0) * 10.0 
    val y: Double = rint(point.y/10.0) * 10.0 
    val z: Double = rint(point.z/10.0) * 10.0 

    val mean: Point = groups(Point(x, y, z)) //.getOrElse(throw new Exception(s"$point out of range of mean ($x, $y, $z).")) 

    (mean, point) 
    } 

    @tailrec 
    private def total(points: GenSeq[Point]): Option[Point] = { 
    points.size match { 
     case 0 => None 
     case 1 => Some(points(0)) 
     case _ => total((points(0) + points(1)) +: points.drop(2)) 
    } 
    } 

    def mean(points: GenSeq[Point]): Option[Point] = { 
    total(points) match { 
     case None => None 
     case Some(p) => Some(p/points.size) 
    } 
    } 

    val startTime = System.currentTimeMillis() 

    println("starting test ...") 

    val res: ParMap[Point, GenSeq[Point]] = points.par.map(p => findClosest(p, groups)).groupBy(pp => pp._1).map(kv => kv._1 -> kv._2.map(v => v._2)) 

    val groupTime = System.currentTimeMillis() 
    println(s"... grouped result after ${groupTime - startTime}ms ...") 

    points.par.foreach(p => if (! res(findClosest(p, groups)._1).exists(_ == p)) println(s"point $p not found")) 

    val checkTime = System.currentTimeMillis() 

    println(s"... checked grouped result after ${checkTime - startTime}ms ...") 

    val means: ParMap[Point, GenSeq[Point]] = res.map{ kv => mean(kv._2).get -> kv._2 } 

    val meansTime = System.currentTimeMillis() 

    println(s"... means calculated after ${meansTime - startTime}ms.") 
} 

object ParallelPoint { 
    def main(args: Array[String]): Unit = new ParallelPoint() 
} 

case class Point(x: Double, y: Double, z: Double) { 
    def +(that: Point): Point = { 
     Point(this.x + that.x, this.y + that.y, this.z + that.z) 
    } 

    def /(scale: Double): Point = Point(x/ scale, y/scale, z/scale) 
} 

最后一步用分组点的计算平均值替换分组点作为映射键。这在我的2011 MBP中约30秒内处理1000万个点。