2013-10-04 52 views
3

我想实现特定的算法,但是我很难找到一份工作的良好数据结构。该算法的简化版本就像下面:使用不可变数据结构来变更数据

Input: A set of points. 
Output: A new set of points. 
Step 1: For each point, calculate the closest points in a radius. 
Step 2: For each point, calculate a value "v" from the closest points subset. 
Step 3: For each point, calculate a new value "w" from the closest points and 
     the values "v" from the previous step, i.e, "w" depends on the neighbors 
     and "v" of each neighbor. 
Step 4: Update points. 

在C++中,我可以解决这个问题是这样的:用一个天真结构

struct Point { 
    Vector position; 
    double v, w; 
    std::vector<Point *> neighbors; 
}; 

std::vector<Point> points = initializePoints(); 
calculateNeighbors(points); 
calculateV(points); // points[0].v = value; for example. 
calculateW(points); 

如点的列表,我无法更新值“v”到原始的一组点中,并且需要两次计算邻居。我怎样才能避免这种情况并保持函数是纯粹的,因为计算邻居是算法中最昂贵的部分(超过30%的时间)?

PS .:对于那些经验丰富的数值方法和CFD,这是平滑粒子流体动力学方法的简化版本。

更新:更改步骤3,使其更清晰。

+0

我喜欢@ChrisTaylor给出的答案。我还在'ST s' monad中使用了可变向量(来自'vectors'包),用于需要数组中存储数据的实时变异的算法。 –

回答

6

Haskell根本不提供突变是一个常见的神话。实际上,它提供了一种非常特殊的突变:一个值可以从未评估到评估完全改变一次。利用这种特殊类型的突变的技术被称为tying the knot。我们将与数据结构就像你的一个从C++开始:

data Vector -- held abstract 

data Point = Point 
    { position :: Vector 
    , v, w  :: Double 
    , neighbors :: [Point] 
    } 

现在,我们要做的是建立一个Array Pointneighbors包含指向同一阵列中的其他元素。 Array在下面的代码中的主要特点是它很懒惰(它不会过早强制它的元素)并且具有快速的随机访问;如果您愿意,您可以使用这些属性替换您最喜欢的备用数据结构。

邻居寻找功能的接口有很多选择。为了具体并使我自己的工作变得简单,我会假设你有一个函数,它需要一个Vector和一个Vectors的列表,并给出邻居的索引。

findNeighbors :: Vector -> [Vector] -> [Int] 
findNeighbors = undefined 

让我们也采取了一些类型computeVcomputeW。对于随机数,我们会要求computeV符合您所陈述的非正式合约,即可以查看的positionneighbors字段,但不能查看vw字段。 (同样的,computeW可能只看到w字段中的任何Point)。实际上可以在没有太多体操类型的情况下强制执行此操作,但现在让我们跳过它。

computeV, computeW :: Point -> Double 
(computeV, computeW) = undefined 

现在我们准备构建我们的(标记的)内存图。

buildGraph :: [Vector] -> Array Int Point 
buildGraph vs = answer where 
    answer = listArray (0, length vs-1) [point pos | pos <- vs] 
    point pos = this where 
     this = Point 
      { position = pos 
      , v = computeV this 
      , w = computeW this 
      , neighbors = map (answer!) (findNeighbors pos vs) 
      } 

就是这样,真的。现在,你可以写你的

newPositions :: Point -> [Vector] 
newPositions = undefined 

其中newPositions是完全免费检查任何它交到Point的领域,并把所有的功能整合在一起:

update :: [Vector] -> [Vector] 
update = newPositions <=< elems . buildGraph 

编辑:...解释开始时的“特殊种类突变”评论:在评估期间,您可以期望当您要求w字段的Point事情将按以下顺序发生时:computeW将强制v字段;那么computeV将强制neighbors字段;那么neighbors字段将从未评估变为评估;那么v字段将从未评估变为评估;那么w字段将从未评估变为评估。最后三个步骤看起来非常类似于C++算法的三个突变步骤!

双编辑:我决定我想看到这个东西运行,所以我实例化了所有上面用虚拟实现进行抽象的东西。我也希望看到它只评估一次,因为我甚至不知道我做得对!所以我扔了一些trace电话。这里有一个完整的文件:

import Control.Monad 
import Data.Array 
import Debug.Trace 

announce s (Vector pos) = trace $ "computing " ++ s ++ " for position " ++ show pos 

data Vector = Vector Double deriving Show 

data Point = Point 
    { position :: Vector 
    , v, w  :: Double 
    , neighbors :: [Point] 
    } 

findNeighbors :: Vector -> [Vector] -> [Int] 
findNeighbors (Vector n) vs = [i | (i, Vector n') <- zip [0..] vs, abs (n - n') < 1] 

computeV, computeW :: Point -> Double 
computeV (Point pos _ _ neighbors) = sum [n | Point { position = Vector n } <- neighbors] 
computeW (Point pos v _ neighbors) = sum [v | Point { v = v } <- neighbors] 

buildGraph :: [Vector] -> Array Int Point 
buildGraph vs = answer where 
    answer = listArray (0, length vs-1) [point pos | pos <- vs] 
    point pos = this where { this = Point 
     { position = announce "position" pos $ pos 
     , v   = announce "v" pos $ computeV this 
     , w   = announce "w" pos $ computeW this 
     , neighbors = announce "neighbors" pos $ map (answer!) (findNeighbors pos vs) 
     } } 

newPositions :: Point -> [Vector] 
newPositions (Point { position = Vector n, v = v, w = w }) = [Vector (n*v), Vector w] 

update :: [Vector] -> [Vector] 
update = newPositions <=< elems . buildGraph 

和ghci中运行:

*Main> length . show . update . map Vector $ [0, 0.25, 0.75, 1.25, 35] 
computing position for position 0.0 
computing v for position 0.0 
computing neighbors for position 0.0 
computing position for position 0.25 
computing position for position 0.75 
computing w for position 0.0 
computing v for position 0.25 
computing neighbors for position 0.25 
computing v for position 0.75 
computing neighbors for position 0.75 
computing position for position 1.25 
computing w for position 0.25 
computing w for position 0.75 
computing v for position 1.25 
computing neighbors for position 1.25 
computing w for position 1.25 
computing position for position 35.0 
computing v for position 35.0 
computing neighbors for position 35.0 
computing w for position 35.0 
123 

正如你所看到的,每个字段在计算最多只会对每个位置。

+0

非常有趣和简单的解决方案。我期待的那种东西。非常感谢你。 –

3

你能做这样的事吗?鉴于以下类型签名

calculateNeighbours :: [Point] -> [[Point]] 

calculateV :: [Point] -> Double 

calculateW :: [Point] -> Double -> Double 

你可以写

algorithm :: [Point] -> [(Point, Double, Double)] 
algorithm pts =        -- pts :: [Point] 
    let nbrs = calculateNeighbours pts  -- nbrs :: [[Point]] 
     vs = map calculateV nbrs   -- vs :: [Double] 
     ws = zipWith calculateW nbrs vs -- ws :: [Double] 
    in zip3 pts vs ws      --  :: [(Point,Double,Double)] 

这将计算邻居列表中只有一次,并重新使用的vw的计算值。

如果这不是你想要的,你可以再详细一点吗?

+0

为什么不避免'zip3'? (在'calculateNeighbours'返回前简单计算'v'和'w')。 – josejuan

+1

@josejuan我假定'calculateNeighbours','calculateV'和'calculateW'函数被给出了,并且展示了如何将它们合并。当然,如果你可以重写这些功能,你可能会更有效地做到这一点。 –

+0

这正是我现在正在做的事情,它不起作用。 calculateW需要邻居和vs(分配给每个点),即邻居需要在calculateW中更新[[Point,Double]]。 –

0

我认为你应该要么使用地图(HashMap)来分别计算五世(和W的)存储从您的Point的,或者使用mutable variables来反映你的C++算法。第一种方法更“功能化”,例如你可以很容易地将parralelism添加到它中,因为所有的数据都是不可变的,但它应该慢一点,因为每次你需要逐点获取v时你必须计算hash值。