7

我正在使用 Haskell 从生物信息学领域实现一个主题查找算法。除了说它是分支定界中值字符串搜索之外,我不会详细介绍该算法。我曾计划通过实现并发方法(以及后来的 STM 方法)来使我的实现更有趣,以便获得多核加速但在使用以下标志编译之后

$ ghc -prof -auto-all -O2 -fllvm -threaded -rtsopts --make main 

并打印配置文件我看到了一些有趣的东西(也许很明显):

COST CENTRE      entries  %time %alloc  
hammingDistance  34677951  47.6   14.7  
motifs           4835446   43.8   71.1  

很明显,无需接近多核编程就可以获得显着的加速(尽管这已经完成,我只需要找到一些好的测试数据并为此整理出标准)。

无论如何,这两个功能都是纯功能性的,绝不是并发的。他们也在做一些非常简单的事情,所以我很惊讶他们花了这么多时间。这是他们的代码:

data NukeTide = A | T | C | G deriving (Read, Show, Eq, Ord, Enum)

type Motif = [NukeTide] 

hammingDistance :: Motif -> Motif -> Int
hammingDistance [] [] = 0
hammingDistance xs [] = 0 -- optimistic
hammingDistance [] ys = 0 -- optimistic
hammingDistance (x:xs) (y:ys) = case (x == y) of
    True  -> hammingDistance xs ys
    False -> 1 + hammingDistance xs ys

motifs :: Int -> [a] -> [[a]]
motifs n nukeTides = [ take n $ drop k nukeTides | k <- [0..length nukeTides - n] ]

请注意,在 hammingDistance 的两个参数中,我实际上可以假设 xs 将是 x 长并且 ys 将小于或等于它,如果这为改进提供了空间。

如您所见,hammingDistance 计算两个基序(核苷酸列表)之间的汉明距离。主题函数接受一个数字和一个列表并返回该长度的所有子字符串,例如:

> motifs 3 "hello world"
["hel","ell","llo","lo ","o w"," wo","wor","orl","rld"]

由于所涉及的算法过程非常简单,我想不出进一步优化它的方法。然而,我确实有两个关于我应该去哪里的猜测:

  1. HammingDistance:我使用的数据类型(NukeTides 和 [])很慢/很笨拙。这只是一个猜测,因为我不熟悉他们的实现,但我认为定义我自己的数据类型,虽然更易读,但可能涉及比我想要的更多的开销。模式匹配对我来说也是陌生的,我不知道这是微不足道的还是昂贵的。
  2. 主题:如果我没看错的话,70% 的内存分配是由主题完成的,我认为有时必须进行垃圾收集。再次使用通用列表可能会减慢我或列表理解的速度,因为我非常不清楚这样做的成本。

有人对这里的常规程序有什么建议吗?如果数据类型是问题所在,那么数组会是正确的答案吗?(我听说它们是装在盒子里的)

谢谢您的帮助。

编辑:我突然想到,如果我描述调用这两个函数的方式可能会很有用:

totalDistance :: Motif -> Int
totalDistance motif = sum $ map (minimum . map (hammingDistance motif) . motifs l) dna

此函数是另一个函数的结果,并在树中的节点周围传递。在树中的每个节点上,对核苷酸(长度 <= n,即如果 == n 则它是叶节点)进行评估,使用 totalDistance 对节点进行评分。从那时起,它就是典型的分支定界算法。

编辑:约翰要求我打印出我所做的改变,这实际上消除了图案的成本:

scoreFunction :: DNA -> Int -> (Motif -> Int)
scoreFunction dna l = totalDistance
    where
        -- The sum of the minimum hamming distance in each line of dna
        -- is given by totalDistance motif
        totalDistance motif = sum $ map (minimum . map (hammingDistance motif)) possibleMotifs
        possibleMotifs = map (motifs l) dna -- Previously this was computed in the line above

我在原始帖子中没有说清楚,但 scoreFunction 只调用一次,结果在树遍历/分支中传递并绑定并用于评估节点。回想起来,在每一步重新计算图案并不是我做过的最聪明的事情之一。

4

3 回答 3

7

您的定义motifs看起来比必要的遍历要多得多,因为每个应用程序drop都必须从头开始遍历列表。我会Data.List.tails改为使用它来实现它:

motifs2 :: Int -> [a] -> [[a]]
motifs2 n nukeTides = map (take n) $ take count $ tails nukeTides
  where count = length nukeTides - n + 1

GHCi 中的快速比较显示了差异(sum . map length用于强制评估):

*Main> let xs = concat (replicate 10000 [A, T, C, G])
(0.06 secs, 17914912 bytes)
*Main> sum . map length $ motifs 5 xs
199980
(3.47 secs, 56561208 bytes)
*Main> sum . map length $ motifs2 5 xs
199980
(0.15 secs, 47978952 bytes)
于 2011-10-09T15:36:54.053 回答
6

你的定义hammingDistance可能比它可能的效率低得多。

hammingDistance (x:xs) (y:ys) = case (x == y) of
    True  -> hammingDistance xs ys
    False -> 1 + hammingDistance xs ys

由于 haskell 的懒惰,这将扩展为(在最坏的情况下):

(1 + (1 + (1 + ...)))

它将作为堆栈上的 thunk 存在,只有在使用时才会减少。这是否真的是一个问题取决于调用站点、编译器选项等,因此以完全避免此问题的形式编写代码通常是一种好习惯。

一个常见的解决方案是创建一个带有严格累加器的尾递归形式,但在这种情况下,您可以使用高阶函数,如下所示:

hammingDistance :: Motif -> Motif -> Int
hammingDistance xs ys = length . filter (uncurry (==)) $ zip xs ys

这是尾递归实现,用于比较

hammingDistance :: Motif -> Motif -> Int
hammingDistance xs ys = go 0 xs ys
  where
    go !acc [] [] = acc
    go !acc xs [] = acc -- optimistic
    go !acc [] ys = acc -- optimistic
    go !acc (x:xs) (y:ys) = case (x == y) of
      True  -> go acc xs ys
      False -> go (acc+1) xs ys

这使用BangPatterns扩展来强制对累加器进行严格评估,否则它将与您当前的定义有相同的问题。

直接回答您的其他一些问题:

  1. 模式匹配是微不足道的
  2. 您应该使用列表还是数组主要取决于数据的创建方式和使用方式。对于这种情况,列表可能是最好的类型。特别是,如果您的列表在创建时全部被使用,并且您不需要内存中的整个列表,那么它们应该没问题。但是,如果您确实将列表保留在内存中,那么它们会占用大量空间。

使用模式

我认为您使用这些功能的方式也做了一些额外的工作:

(minimum . map (hammingDistance motif) . motifs l

由于您只需要 minimum hammingDistance,因此您可能正在计算许多不必要的额外值。我可以想到两种解决方案:

选项1.定义一个新函数hammingDistanceThresh :: Motif -> Int -> Motif -> Int,当它超过阈值时停止。稍微奇怪的类型排序是为了方便在折叠中使用它,如下所示:

let motifs' = motifs l
in foldl' (hammingDistanceThresh motif) (hammingDistance motif $ head motifs') (tail motifs')

选项 2。如果您定义了惰性自然数类型,则可以使用它而不是Ints 作为 的结果hammingDistance。然后只计算必要的汉明距离。

最后一点:使用-auto-all将非常频繁地生成比其他分析选项慢得多的代码。我建议您-auto先尝试使用,然后SCC在必要时添加手动注释。

于 2011-10-09T19:41:57.320 回答
2

是的...我无法抗拒达到极限并编写了一个普通金属式的打包位实现:

{-# language TypeSynonymInstances #-}
{-# language BangPatterns #-}

import Data.Bits
import Data.Word


data NukeTide = A | T | C | G deriving (Read, Show, Eq, Ord, Enum)

type UnpackedMotif = [NukeTide] 

type PackageType = Word32
nukesInPackage = 16 :: Int
allSetMask = complement 0 :: PackageType


-- Be careful to have length of motif == nukesInPackage here!
packNukesToWord :: UnpackedMotif -> PackageType
packNukesToWord = packAt 0
    where packAt _ [] = 0
          packAt i (m:ml) =    (b0 m .&. bit i)
                           .|. (b1 m .&. bit (i+1))
                           .|. packAt (i+2) ml
          b0 A = 0
          b0 T = allSetMask
          b0 C = 0
          b0 G = allSetMask
          b1 A = 0
          b1 T = 0
          b1 C = allSetMask
          b1 G = allSetMask

unpackNukesWord :: PackageType -> UnpackedMotif
unpackNukesWord = unpackNNukesFromWord nukesInPackage

unpackNNukesFromWord :: Int -> PackageType -> UnpackedMotif
unpackNNukesFromWord = unpackN
    where unpackN 0 _ = []
          unpackN i w = (nukeOf $ w .&. r2Mask):(unpackN (i-1) $ w`shiftR`2)
          nukeOf bs
           | bs == 0      = A
           | bs == bit 0  = T
           | bs == bit 1  = C
           | otherwise    = G
          r2Mask = (bit 1 .|. bit 0) :: PackageType


data PackedMotif = PackedMotif { motifPackets::[PackageType]
                               , nukesInLastPack::Int        }
 -- note nukesInLastPack will never be zero; motifPackets must be [] to represent empty motifs.
packNukes :: UnpackedMotif -> PackedMotif
packNukes m = case remain of
               [] -> PackedMotif [packNukesToWord takeN] (length takeN)
               r  -> prAppend (packNukesToWord takeN) (packNukes r)
    where (takeN, remain) = splitAt nukesInPackage m
          prAppend w (PackedMotif l i) = PackedMotif (w:l) i

unpackNukes :: PackedMotif -> UnpackedMotif
unpackNukes (PackedMotif l i) = unpack l i
  where unpack [l] i = unpackNNukesFromWord i l
        unpack (l:ls) i = unpackNukesWord l ++ unpack ls i
        unpack [] _ = []

instance Show PackedMotif where
  show = show . unpackNukes



class Nukes a where
  pLength :: a -> Int
  shiftLN1 :: a -> a
  hammingDistance :: a -> a -> Int
  motifs :: Int -> a -> [a]

instance Nukes PackageType where
  pLength _ = nukesInPackage
  shiftLN1 = (`shiftR`2)
  hammingDistance !x !y = fromIntegral $ abt (x `xor` y)
      where abt !b = bbt(b.&.a0Mask .|. ((b.&.a1Mask) `shiftR` 1))
            bbt !b = sbt $ (b.&.r16Mask) + (b `shiftR` nukesInPackage)
            sbt !b = (r2Mask .&. b)             + (r2Mask .&. (b`shiftR`2))
                   + (r2Mask .&. (b`shiftR`4))  + (r2Mask .&. (b`shiftR`6))
                   + (r2Mask .&. (b`shiftR`8))  + (r2Mask .&. (b`shiftR`10))
                   + (r2Mask .&. (b`shiftR`12)) + (r2Mask .&. (b`shiftR`14))
            a0Mask = 0x55555555 :: PackageType
            a1Mask = 0xAAAAAAAA :: PackageType
            r16Mask = 0xFFFF :: PackageType
            r2Mask = 0x3 :: PackageType
  motifs 0 _ = []
  motifs l x = x : motifs (l-1) (shiftLN1 x)


maskNukesBut :: Int -> PackageType -> PackageType
maskNukesBut i = ( ( allSetMask `shiftR` (2*(nukesInPackage - i)) ) .&.)

instance Nukes PackedMotif where
  pLength (PackedMotif (x:xs) ix) = nukesInPackage * (length xs) + ix
  pLength _ = 0
  shiftLN1 ξ@(PackedMotif [] _) = ξ
  shiftLN1 (PackedMotif [x] ix) | ix>1       = PackedMotif [x`shiftR`2] (ix-1)
                                | otherwise  = PackedMotif [] nukesInPackage
  shiftLN1 (PackedMotif (x:x':xs) ix)
        = PackedMotif (( shiftLN1 x .|. pnext ):sxs) resLMod
      where sxs = motifPackets $ shiftLN1 (PackedMotif (x':xs) ix)
            pnext = shiftL (x'.&.0x3) 30
            resLMod = if ix > 1 then (ix-1) else nukesInPackage
  hammingDistance xs ys = go 0 xs ys
    where
      go :: Int -> PackedMotif -> PackedMotif -> Int
      go !acc (PackedMotif [x] ix) (PackedMotif [y] iy)
       | ix > iy    = acc + (hammingDistance y $ maskNukesBut iy x)
       | otherwise  = acc + (hammingDistance x $ maskNukesBut ix y)
      go !acc (PackedMotif [x] ix) (PackedMotif (y:ys) iy)
        = acc + (hammingDistance x $ maskNukesBut ix y)
      go !acc (PackedMotif (x:xs) ix) (PackedMotif [y] iy)
        = acc + (hammingDistance y $ maskNukesBut iy x)
      go !acc (PackedMotif (x:xs) ix) (PackedMotif (y:ys) iy)
        = go (acc + hammingDistance x y) (PackedMotif xs ix) (PackedMotif ys iy)
      go !acc _ _ = acc
  motifs l ξ
     | l>0        = fShfts (min nukesInPackage $ pLength ξ + 1 - l) ξ >>= ct
     | otherwise  = []
    where fShfts k χ | k > 0      = χ : fShfts (k-1) (shiftLN1 χ)
                     | otherwise  = []
          ct (PackedMotif ys iy) = case remain of
                [] -> if (length takeN - 1) * nukesInPackage + iy >= l
                       then [PackedMotif takeN lMod] else []
                _  -> PackedMotif takeN lMod : ct(PackedMotif (tail ys) iy)
            where (takeN, remain) = splitAt lQuot ys
          (lQuot,lMod) = case l `quotRem` nukesInPackage of
                   (i,0) -> (i, nukesInPackage)
                   (i,m) -> (i+1, m)

它可以从UnpackedMotif = [NukeTide]s 与packNukes函数一起使用,例如

*BioNuke0> motifs 23 $ packNukes $ take 27 $ cycle [A,T,G,C,A]
[[A,T,G,C,A,A,T,G,C,A,A,T,G,C,A,A,T,G,C,A,A,T,G],[T,G,C,A,A,T,G,C,A,A,T,G,C,A,A,T,G,C,A,A,T,G,C],[G,C,A,A,T,G,C,A,A,T,G,C,A,A,T,G,C,A,A,T,G,C,A],[C,A,A,T,G,C,A,A,T,G,C,A,A,T,G,C,A,A,T,G,C,A,A],[A,A,T,G,C,A,A,T,G,C,A,A,T,G,C,A,A,T,G,C,A,A,T]]

*BioNuke0> hammingDistance (packNukes [A,T,G,C,A,A,T,G]) (packNukes [A,T,C,C,A,T,G])
3

*BioNuke0> map (hammingDistance (packNukes $ take 52 $ cycle [A,T,C,C,A,T,G])) (motifs 52 $ packNukes $ take 523 $ cycle [A,T,C,C,A,T,G])
[0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44,38,52,0,52,36,45,44]

我还没有将性能与原始版本进行比较,但它应该比任何代数数据类型实现都要快很多。另外,它很容易提供一种节省空间的存储格式。

于 2011-10-10T02:12:44.340 回答