我尝试了两种解决方案。第一个使用Par
单子(即Control.Monad.Par
):
import Control.Monad.Par (Par, NFData)
import Control.Monad.Par.Combinator (parMap)
import Data.Maybe (catMaybes)
import Data.List.Split (chunksOf)
takeJustsPar :: (NFData b) => Int -> Int -> (a -> Maybe b) -> [a] -> Par [b]
takeJustsPar n chunkSize f as = go n (chunksOf chunkSize as) where
go _ [] = return []
go 0 _ = return []
go numNeeded (chunk:chunks) = do
evaluatedChunk <- parMap f chunk
let results = catMaybes evaluatedChunk
numFound = length results
numRemaining = numNeeded - numFound
fmap (results ++) $ go numRemaining chunks
使用的第二次尝试Control.Parallel.Strategies
:
import Control.Parallel.Strategies
import Data.List.Split (chunksOf)
chunkPar :: (NFData a) => Int -> Int -> [a] -> [a]
chunkPar innerSize outerSize as
= concat ((chunksOf innerSize as) `using` (parBuffer outerSize rdeepseq))
后者最终变得更加可组合,因为我可以写:
take n $ catMaybes $ chunkPar 1000 10 $ map expensiveFunction xs
...而不是将take
andcatMaybes
行为融入并行策略。
后一种解决方案也提供了近乎完美的利用。在我测试的令人尴尬的并行问题上,它为 8 个内核提供了 99% 的利用率。我没有测试Par
monad 的利用率,因为我是借用同事的电脑,不想在我满意的性能时浪费他们的时间Control.Parallel.Strategies
。
所以答案是使用Control.Parallel.Strategies
,它提供了更多的可组合行为和出色的多核利用率。