8

我想使用Vectors 在 Haskell 中编写 Floyd-Warshall 所有对最短路径算法的有效实现,以期获得良好的性能。

实现非常简单,但不是使用 3 维 |V|×|V|×|V| 矩阵,使用二维向量,因为我们只读取前一个k值。

因此,该算法实际上只是传入 2D 向量并生成新的 2D 向量的一系列步骤。最终的 2D 向量包含所有节点 (i,j) 之间的最短路径。

我的直觉告诉我,确保在每一步之前评估先前的 2D 向量很重要,所以我在函数BangPatternsprev参数fw和 strict上使用了foldl'

{-# Language BangPatterns #-}

import           Control.DeepSeq
import           Control.Monad       (forM_)
import           Data.List           (foldl')
import qualified Data.Map.Strict     as M
import           Data.Vector         (Vector, (!), (//))
import qualified Data.Vector         as V
import qualified Data.Vector.Mutable as V hiding (length, replicate, take)

type Graph = Vector (M.Map Int Double)
type TwoDVector = Vector (Vector Double)

infinity :: Double
infinity = 1/0

-- calculate shortest path between all pairs in the given graph, if there are
-- negative cycles, return Nothing
allPairsShortestPaths :: Graph -> Int -> Maybe TwoDVector
allPairsShortestPaths g v =
  let initial = fw g v V.empty 0
      results = foldl' (fw g v) initial [1..v]
  in if negCycle results
        then Nothing
        else Just results
  where -- check for negative elements along the diagonal
        negCycle a = any not $ map (\i -> a ! i ! i >= 0) [0..(V.length a-1)]

-- one step of the Floyd-Warshall algorithm
fw :: Graph -> Int -> TwoDVector -> Int -> TwoDVector
fw g v !prev k = V.create $ do                                           -- ← bang
  curr <- V.new v
  forM_ [0..(v-1)] $ \i ->
    V.write curr i $ V.create $ do
      ivec <- V.new v
      forM_ [0..(v-1)] $ \j -> do
        let d = distance g prev i j k
        V.write ivec j d
      return ivec
  return curr

distance :: Graph -> TwoDVector -> Int -> Int -> Int -> Double
distance g _ i j 0 -- base case; 0 if same vertex, edge weight if neighbours
  | i == j    = 0.0
  | otherwise = M.findWithDefault infinity j (g ! i)
distance _ a i j k = let c1 = a ! i ! j
                        c2 = (a ! i ! (k-1))+(a ! (k-1) ! j)
                        in min c1 c2

但是,当使用 1000 个节点和 47978 条边的图运行这个程序时,事情看起来一点也不好看。内存使用率非常高,程序运行时间太长。该程序是用ghc -O2.

我重建了分析程序,并将迭代次数限制为 50:

 results = foldl' (fw g v) initial [1..50]

然后我用+RTS -p -hcand运行程序+RTS -p -hd

这……很有趣,但我想这表明它正在积累大量的 thunk。不好。

好的,所以在黑暗中拍摄了几张照片后,我添加了一个deepseqinfw以确保prev 真的被评估:

let d = prev `deepseq` distance g prev i j k

现在情况看起来好多了,我实际上可以在不断使用内存的情况下运行程序完成。很明显,prev论点的轰动是不够的。

为了与之前的图表进行比较,以下是添加 50 次迭代后的内存使用情况deepseq

好的,事情好多了,但我还有一些问题:

  1. 这是这个空间泄漏的正确解决方案吗?我觉得插入deepseqa 有点难看是错误的?
  2. Vector在这里使用 s 是惯用的/正确的吗?我正在为每次迭代构建一个全新的向量,并希望垃圾收集器会删除旧Vector的 s。
  3. 我还能做些什么来让这种方法运行得更快吗?

作为参考,这里是graph.txthttp ://sebsauvage.net/paste/?45147f7caf8c5f29#7tiCiPovPHWRm1XNvrSb/zNl3ujF3xB3yehrxhEdVWw=

这里是main

main = do
  ls <- fmap lines $ readFile "graph.txt"
  let numVerts = head . map read . words . head $ ls
  let edges = map (map read . words) (tail ls)
  let g = V.create $ do
        g' <- V.new numVerts
        forM_ [0..(numVerts-1)] (\idx -> V.write g' idx M.empty)
        forM_ edges $ \[f,t,w] -> do
          -- subtract one from vertex IDs so we can index directly
          curr <- V.read g' (f-1)
          V.write g' (f-1) $ M.insert (t-1) (fromIntegral w) curr
        return g'
  let a = allPairsShortestPaths g numVerts
  case a of
    Nothing -> putStrLn "Negative cycle detected."
    Just a' -> do
      putStrLn  $ "The shortest, shortest path has length "
              ++ show ((V.minimum . V.map V.minimum) a')
4

1 回答 1

5

First, some general code cleanup:

In your fw function, you explicitly allocate and fill mutable vectors. However, there is a premade function for this exact purpose, namely generate. fw can therefore be rewritten as

V.generate v (\i -> V.generate v (\j -> distance g prev i j k))

Similarly, the graph generation code can be replaced with replicate and accum:

let parsedEdges = map (\[f,t,w] -> (f - 1, (t - 1, fromIntegral w))) edges
let g = V.accum (flip (uncurry M.insert)) (V.replicate numVerts M.empty) parsedEdges

Note that this totally removes all need for mutation, without losing any performance.

Now, to the actual questions:

  1. In my experience, deepseq is very useful, but only as quick fix to space leaks like this one. The fundamental problem is not that you need to force the results after you've produced them. Instead, the use of deepseq implies that you should have been building the structure more strictly in the first place. In fact, if you add a bang pattern in your vector creation code like so:

    let !d = distance g prev i j k
    

    Then the problem is fixed without deepseq. Note that this doesn't work with the generate code, because, for some reason (I might create a feature request for this), vector does not provide strict functions for boxed vectors. However, when I get to unboxed vectors in answer to question 3, which are strict, both approaches work without strictness annotations.

  2. As far as I know, the pattern of repeatedly generating new vectors is idiomatic. The only thing not idiomatic is the use of mutability - except when they are strictly necessary, mutable vectors are generally discouraged.

  3. There are a couple of things to do:

    • Most simply, you can replace Map Int with IntMap. As that isn't really the slow point of the function, this doesn't matter too much, but IntMap can be much faster for heavy workloads.

    • You can switch to using unboxed vectors. Although the outer vector has to remain boxed, as vectors of vectors can't be unboxed, the inner vector can be. This also solves your strictness problem - because unboxed vectors are strict in their elements, you don't get a space leak. Note that on my machine, this improves the performance from 4.1 seconds to 1.3 seconds, so the unboxing is very helpful.

    • You can flatten the vector into a single one and use multiplication and division to switch between two dimensional indicies and one dimentional indicies. I don't recommend this, as it is a bit involved, quite ugly, and, due to the division, actually slows down the code on my machine.

    • You can use repa. This has the huge advantage of automatically parallelizing your code. Note that, since repa flattens its arrays and apparently doesn't properly get rid of the divisions needed to fill nicely (it's possible to do with nested loops, but I think it uses a single loop and a division), it has the same performance penalty as I mentioned above, bringing the runtime from 1.3 seconds to 1.8. However, if you enable parallelism and use a multicore machine, you start seeing some benifits. Unfortunately, you current test case is too tiny to see much benifit, so, on my 6 core machine, I see it drop back down to 1.2 seconds. If I up the size back to [1..v] instead of [1..50], the parallelism brings it from 32 seconds to 13. Presumably, if you give this program a larger input, you might see more benifit.

      If you're interested, I've posted my repa-ified version here.

    • EDIT: Use -fllvm. Testing on my computer, using repa, I get 14.7 seconds without parallelism, which is almost as good as without -fllvm and with parallelism. In general, LLVM can just handle array based code like this very well.

于 2014-01-28T21:34:58.763 回答