3

考虑以下 Haskell 程序(我这样做主要是为了学习目的):

import qualified Control.Concurrent.MSem as Sem
import System.Environment (getArgs)
import Control.Concurrent (forkIO)
import Control.Monad

-- Traverse with maximum n threads
parallelTraverse :: Foldable a => Int -> (b -> IO()) -> a b -> IO ()
parallelTraverse n action values = do
  sem <- Sem.new n
  forM_ values $ \value -> Sem.with sem (forkIO $ action value)

main :: IO ()
main = do
  args <- getArgs
  let nThreads = read . head $ args :: Int
  parallelTraverse nThreads print [(1::Int)..]

当我运行它时,内存迅速攀升至数 GB。我尝试了各种组合以确保丢弃中间计算的结果(打印操作)。为什么还漏空间?

4

1 回答 1

6

首先,你在下面的文章中有一个明显的错误:

Sem.with sem (forkIO $ action value)

您正在围绕“fork”操作而不是那里的操作处理来自主线程的信号量。以下是实现它的正确方法:

forkIO (Sem.with sem (action value))

即,从分叉线程的上下文中处理信号量。

其次,在以下代码中,您将parallelTraverse在无限列表上调用操作:

parallelTraverse nThreads print [(1::Int)..]

这导致线程的无限分叉。而且由于forkIO调用线程的操作大致是即时的,因此您很快就会耗尽资源也就不足为奇了。


要使用信号量来限制工作线程的数量,该with模式在您的情况下根本不会这样做。相反,您应该使用 and 的显式组合,wait并且signal不要忘记正确处理异常(以防万一)。例如,:

parallelTraverse :: Foldable a => Int -> (b -> IO()) -> a b -> IO ()
parallelTraverse n action values = do
  sem <- Sem.new n
  forM_ values $ \value -> do
    Sem.wait sem
    forkIO $ finally (action value) (Sem.signal sem)
于 2015-09-07T20:12:18.713 回答