2

我想实现一个特定的算法,但我在为这项工作找到一个好的数据结构时遇到了麻烦。该算法的一个更简单版本的工作原理如下:

Input: A set of points.
Output: A new set of points.
Step 1: For each point, calculate the closest points in a radius.
Step 2: For each point, calculate a value "v" from the closest points subset.
Step 3: For each point, calculate a new value "w" from the closest points and
        the values "v" from the previous step, i.e, "w" depends on the neighbors
        and "v" of each neighbor.
Step 4: Update points.

在 C++ 中,我可以这样解决:

struct Point {
    Vector position;
    double v, w;
    std::vector<Point *> neighbors;
};

std::vector<Point> points = initializePoints();
calculateNeighbors(points);
calculateV(points); // points[0].v = value; for example.
calculateW(points);

对于诸如点列表之类的简单结构,我无法将值“v”更新为原始点集,并且需要计算两次邻居。我怎样才能避免这种情况并保持函数纯净,因为计算邻居是算法中最昂贵的部分(超过 30% 的时间)?

PS.:对于那些在数值方法和 CFD 方面有经验的人,这是平滑粒子流体动力学方法的简化版本。

更新:更改了第 3 步,使其更清晰。

4

3 回答 3

6

Haskell 根本不提供突变是一个普遍的神话。实际上,它提供了一种非常特殊的变异:一个值只能变异一次,从未评估到评估。利用这种特殊突变的艺术被称为打结。我们将从一个类似于 C++ 的数据结构开始:

data Vector -- held abstract

data Point = Point
    { position  :: Vector
    , v, w      :: Double
    , neighbors :: [Point]
    }

现在,我们要做的是构建一个Array Point包含neighbors指向同一数组中其他元素的指针。以下代码的主要特点Array是它是spine-lazy(它不会过早地强制它的元素)并且具有快速的随机访问;如果您愿意,可以用这些属性替换您最喜欢的备用数据结构。

找邻居功能的接口有很多选择。为了具体起见并使我自己的工作变得简单,我假设你有一个函数,它接受一个Vector和一个列表Vectors并给出邻居的索引。

findNeighbors :: Vector -> [Vector] -> [Int]
findNeighbors = undefined

让我们也为computeV和设置一些类型computeW。对于 nonce,我们将要求computeV遵守您所说的非正式合同,即它可以查看 any 的positionandneighbors字段Point,但不能查看vorw字段。(类似地,除了它可以得到的任何领域computeW之外,可能会查看任何东西。)实际上可以在类型级别强制执行此操作而无需太多体操,但现在让我们跳过它。wPoint

computeV, computeW :: Point -> Double
(computeV, computeW) = undefined

现在我们准备好构建我们的(标记的)内存图。

buildGraph :: [Vector] -> Array Int Point
buildGraph vs = answer where
    answer = listArray (0, length vs-1) [point pos | pos <- vs]
    point pos = this where
        this = Point
            { position = pos
            , v = computeV this
            , w = computeW this
            , neighbors = map (answer!) (findNeighbors pos vs)
            }

就是这样,真的。现在你可以写你的

newPositions :: Point -> [Vector]
newPositions = undefined

wherenewPositions可以完全自由地检查Point它的任何字段,并将所有功能放在一起:

update :: [Vector] -> [Vector]
update = newPositions <=< elems . buildGraph

编辑:...在开头解释“特殊类型的突变”评论:在评估期间,您可以期望当您要求wa 的字段时Point,事情会按以下顺序发生:computeW将强制该v字段;然后computeV将强制neighbors场;然后该neighbors字段将从未评估变为已评估;然后该v字段将从未评估变为已评估;然后该w字段将从未评估变为已评估。最后三个步骤看起来与 C++ 算法的三个变异步骤非常相似!

双重编辑:我决定我想看到这个东西运行,所以我用虚拟实现实例化了上面所有抽象的东西。我还想看到它只评估一次,因为我什至不确定我做对了!所以我打了几个trace电话。这是一个完整的文件:

import Control.Monad
import Data.Array
import Debug.Trace

announce s (Vector pos) = trace $ "computing " ++ s ++ " for position " ++ show pos

data Vector = Vector Double deriving Show

data Point = Point
    { position  :: Vector
    , v, w      :: Double
    , neighbors :: [Point]
    }

findNeighbors :: Vector -> [Vector] -> [Int]
findNeighbors (Vector n) vs = [i | (i, Vector n') <- zip [0..] vs, abs (n - n') < 1]

computeV, computeW :: Point -> Double
computeV (Point pos _ _ neighbors) = sum [n | Point { position = Vector n } <- neighbors]
computeW (Point pos v _ neighbors) = sum [v | Point { v = v } <- neighbors]

buildGraph :: [Vector] -> Array Int Point
buildGraph vs = answer where
    answer = listArray (0, length vs-1) [point pos | pos <- vs]
    point pos = this where { this = Point
        { position  = announce "position" pos $ pos
        , v         = announce "v" pos $ computeV this
        , w         = announce "w" pos $ computeW this
        , neighbors = announce "neighbors" pos $ map (answer!) (findNeighbors pos vs)
        } }

newPositions :: Point -> [Vector]
newPositions (Point { position = Vector n, v = v, w = w }) = [Vector (n*v), Vector w]

update :: [Vector] -> [Vector]
update = newPositions <=< elems . buildGraph

并在 ghci 中运行:

*Main> length . show . update . map Vector $ [0, 0.25, 0.75, 1.25, 35]
computing position for position 0.0
computing v for position 0.0
computing neighbors for position 0.0
computing position for position 0.25
computing position for position 0.75
computing w for position 0.0
computing v for position 0.25
computing neighbors for position 0.25
computing v for position 0.75
computing neighbors for position 0.75
computing position for position 1.25
computing w for position 0.25
computing w for position 0.75
computing v for position 1.25
computing neighbors for position 1.25
computing w for position 1.25
computing position for position 35.0
computing v for position 35.0
computing neighbors for position 35.0
computing w for position 35.0
123

如您所见,每个位置的每个字段最多计算一次。

于 2013-10-08T23:51:41.807 回答
3

你能做这样的事情吗?给定以下类型签名

calculateNeighbours :: [Point] -> [[Point]]

calculateV :: [Point] -> Double

calculateW :: [Point] -> Double -> Double

你可以写

algorithm :: [Point] -> [(Point, Double, Double)]
algorithm pts =                             -- pts  :: [Point]
    let nbrs = calculateNeighbours pts      -- nbrs :: [[Point]]
        vs   = map calculateV nbrs          -- vs   :: [Double]
        ws   = zipWith calculateW nbrs vs   -- ws   :: [Double]
     in zip3 pts vs ws                      --      :: [(Point,Double,Double)]

v这仅计算一次邻居列表,并在计算和时重新使用该值w

如果这不是你想要的,你能详细说明一下吗?

于 2013-10-04T09:17:29.563 回答
0

我认为您应该使用 Map (HashMap)分别存储从您的 's 计数的 v(和 w's)Point,或者使用可变变量来反映您的 C++ 算法。第一种方法更具“功能性”,例如,您可以轻松地将并行性添加到其中,因为所有数据都是不可变的,但它应该慢一点,因为每次需要逐点获取 v 时都必须计算哈希。

于 2013-10-07T20:59:21.130 回答