4

我正在编写一个简单的脚本来使用Shelly库并行运行一堆任务,但我想限制任何时候运行的最大任务数。该脚本获取一个文件,每行都有一个输入,并为该输入运行一个任务。文件中有几百个输入,我想一次限制为大约 16 个进程。

当前脚本实际上使用初始计数为 1 的 QSem 限制为 1(很好地尝试)。我似乎遗漏了一些东西,因为当我在具有 4 个输入的测试文件上运行时,我看到了这个:

开始
开始
开始
开始
完毕
完毕
完毕
完毕

所以线程并没有像我期望的那样阻塞在 QSem 上,它们都是同时运行的。我什至已经实现了我自己的信号量MVarTVar而且都没有按我预期的方式工作。我显然错过了一些基本的东西,但是什么?我还尝试编译代码并将其作为二进制文件运行。

#!/usr/bin/env runhaskell
{-# LANGUAGE TemplateHaskell, QuasiQuotes, DeriveDataTypeable, OverloadedStrings #-}

进口雪莉
导入前奏隐藏(FilePath)
导入 Text.Shakespeare.Text (lt)
导入合格的 Data.Text.Lazy 作为 LT
导入 Control.Monad (forM)
导入 System.Environment (getArgs)

将合格的 Control.Concurrent.QSem 导入为 QSem
导入 Control.Concurrent (forkIO, MVar, putMVar, newEmptyMVar, takeMVar)

-- 定义最大并发进程数
maxProcesses :: IO QSem.QSem
maxProcesses = QSem.newQSem 1

bkGrnd :: ShIO a -> ShIO (MVar a)
bkGrnd 过程 = 做
  mvar <- liftIO newEmptyMVar
  _ <- liftIO $ forkIO $ 做
    -- 阻塞直到有空闲进程
    sem <- maxProcesses
    QSem.waitQSem sem
    putStrLn "开始"
    -- 运行shell命令
    结果 <- shelly $ 静默处理
    liftIO $ putMVar mvar 结果
    putStrLn “完成”
    -- 表示这个过程已经完成并且另一个可以运行。
    QSem.signalQSem sem
  返回 mvar

主::IO()
main = shelly $ 默默地 $ do
    [img, 文件] <- liftIO $ getArgs
    内容 <- readfile $ fromText $ LT.pack 文件
    -- 为每一行输入运行一个后台进程。
    结果 <- forM (LT.lines 内容) $ \line -> bkGrnd $ do
      runStdin <命令> <参数>
    LiftIO $mapM_takeMVar 结果
4

3 回答 3

6

正如我在评论中所说,每次调用bkGrnd都会创建自己的信号量,从而允许每个线程继续进行而无需等待。我会尝试这样的事情,其中​​信号量是在 中创建main并每次传递给bkGrnd.

bkGrnd :: QSem.QSem -> ShIO a -> ShIO (MVar a)
bkGrnd sem proc = do
  mvar <- liftIO newEmptyMVar
  _ <- liftIO $ forkIO $ do
    -- Block until there are free processes
    QSem.waitQSem sem
    --
    -- code continues as before
    --

main :: IO ()
main = shelly $ silently $ do
    [img, file] <- liftIO $ getArgs
    contents <- readfile $ fromText $ LT.pack file
    sem <- maxProcesses
    -- Run a backgrounded process for each line of input.
    results <- forM (LT.lines contents) $ \line -> bkGrnd sem $ do
      runStdin <command> <arguments>
    liftIO $ mapM_ takeMVar results
于 2012-04-04T16:23:17.720 回答
4

你有一个答案,但我需要补充一点:如果 killThread 或异步线程死亡是可能的,则 QSem 和 QSemN 不是线程安全的。

我的错误报告和补丁是GHC trac ticket #3160。固定代码可用作名为SafeSemaphore的新库,带有模块 Control.Concurrent.MSem、MSemN、MSampleVar 和奖励 FairRWLock。

于 2012-04-05T08:24:43.820 回答
0

不是更好吗

bkGrnd sem proc = do
  QSem.waitQSem sem
  mvar <- liftIO newEmptyMVar
  _ <- liftIO $ forkIO $ do
  ...

所以甚至forkIO直到你得到信号量?

于 2013-08-25T02:08:01.060 回答