13

我试图在 Haskell 中并行运行多个下载,我通常只使用 Control.Concurrent.Async.mapConcurrently 函数。但是,这样做会打开大约 3000 个连接,这会导致 Web 服务器拒绝所有连接。是否可以完成与 mapConcurrently 相同的任务,但一次只能打开有限数量的连接(即一次只有 2 个或 4 个)?

4

5 回答 5

19

一个快速的解决方案是使用信号量来限制并发操作的数量。这不是最佳的(所有线程都是一次创建然后等待),但是可以:

import Control.Concurrent.MSem
import Control.Concurrent.Async
import Control.Concurrent (threadDelay)
import qualified Data.Traversable as T

mapPool :: T.Traversable t => Int -> (a -> IO b) -> t a -> IO (t b)
mapPool max f xs = do
    sem <- new max
    mapConcurrently (with sem . f) xs

-- A little test:
main = mapPool 10 (\x -> threadDelay 1000000 >> print x) [1..100]
于 2013-09-19T15:25:42.407 回答
9

您也可以尝试pooled-io包,您可以在其中编写:

import qualified Control.Concurrent.PooledIO.Final as Pool
import Control.DeepSeq (NFData)
import Data.Traversable (Traversable, traverse)

mapPool ::
   (Traversable t, NFData b) =>
   Int -> (a -> IO b) -> t a -> IO (t b)
mapPool n f = Pool.runLimited n . traverse (Pool.fork . f)
于 2014-01-17T16:54:41.390 回答
3

Control.Concurrent.Spawn使用库很容易做到这一点:

import Control.Concurrent.Spawn

type URL      = String
type Response = String    

numMaxConcurrentThreads = 4

getURLs :: [URL] -> IO [Response]
getURLs urlList = do
   wrap <- pool numMaxConcurrentThreads
   parMapIO (wrap . fetchURL) urlList

fetchURL :: URL -> IO Response
于 2016-03-20T22:50:34.483 回答
1

如果其中一些线程的持续时间明显长于其他线程,则将线程分块可能效率低下。这是一个更平滑但更复杂的解决方案:

{-# LANGUAGE TupleSections #-}
import Control.Concurrent.Async (async, waitAny)
import Data.List                (delete, sortBy)
import Data.Ord                 (comparing)

concurrentlyLimited :: Int -> [IO a] -> IO [a]
concurrentlyLimited n tasks = concurrentlyLimited' n (zip [0..] tasks) [] []

concurrentlyLimited' _ [] [] results = return . map snd $ sortBy (comparing fst) results
concurrentlyLimited' 0 todo ongoing results = do
    (task, newResult) <- waitAny ongoing
    concurrentlyLimited' 1 todo (delete task ongoing) (newResult:results)
concurrentlyLimited' n [] ongoing results = concurrentlyLimited' 0 [] ongoing results
concurrentlyLimited' n ((i, task):otherTasks) ongoing results = do
    t <- async $ (i,) <$> task
    concurrentlyLimited' (n-1) otherTasks (t:ongoing) results

注意:上面的代码可以使用一个实例MonadBaseControl IO代替来变得更通用IO,这要归功于lifted-async.

于 2014-03-26T22:53:06.787 回答
0

如果您在列表中有操作,则此操作具有较少的依赖关系

import Control.Concurrent.Async (mapConcurrently)
import Data.List.Split (chunksOf)

mapConcurrentChunks :: Int -> (a -> IO b) -> [a] -> IO [b]
mapConcurrentChunks n ioa xs = concat <$> mapM (mapConcurrently ioa) (chunksOf n xs)

编辑:只是缩短了一点

于 2014-01-17T18:10:14.980 回答