自从提出这个问题以来已经 7 年了,似乎仍然没有人想出一个好的解决方案来解决这个问题。Repa 没有mapM/ traverselike 功能,即使是可以在没有并行化的情况下运行的功能。此外,考虑到过去几年取得的进展,它似乎也不太可能发生。
由于 Haskell 中许多数组库的陈旧状态以及我对它们的功能集的总体不满,我在数组库中投入了几年的工作massiv,它借鉴了 Repa 的一些概念,但将其提升到了完全不同的水平。介绍就够了。
在今天之前,有三个类似单子映射的函数massiv(不包括类似函数的同义词:imapM,forM等):
mapM- 任意映射中的常用映射Monad。由于明显的原因不能并行化,而且速度也有点慢(按照通常的方式mapM,在列表中很慢)
traversePrim- 这里我们被限制为PrimMonad,它明显快于mapM,但其原因对于本次讨论并不重要。
mapIO- 顾名思义,这个仅限于IO(或者更确切地说MonadUnliftIO,但这无关紧要)。因为我们在其中,所以IO我们可以自动将数组拆分为与内核一样多的块,并使用单独的工作线程将IO操作映射到这些块中的每个元素。与也是可并行化的 pure 不同fmap,我们必须在IO这里,因为调度的不确定性以及映射操作的副作用。
所以,一旦我读到这个问题,我就想这个问题实际上已经解决了massiv,但没有那么快。随机数生成器,例如 inmwc-random和其他 inrandom-fu不能跨多个线程使用相同的生成器。这意味着,我缺少的唯一难题是:“为每个产生的线程绘制一个新的随机种子并照常进行”。换句话说,我需要两件事:
- 一个函数,它将初始化与工作线程一样多的生成器
- 以及一个抽象,它将根据操作在哪个线程中运行,无缝地为映射函数提供正确的生成器。
所以这正是我所做的。
首先,我将使用特制的randomArrayWS和initWorkerStates函数给出示例,因为它们与问题更相关,然后转到更一般的一元映射。以下是它们的类型签名:
randomArrayWS ::
(Mutable r ix e, MonadUnliftIO m, PrimMonad m)
=> WorkerStates g -- ^ Use `initWorkerStates` to initialize you per thread generators
-> Sz ix -- ^ Resulting size of the array
-> (g -> m e) -- ^ Generate the value using the per thread generator.
-> m (Array r ix e)
initWorkerStates :: MonadIO m => Comp -> (WorkerId -> m s) -> m (WorkerStates s)
对于那些不熟悉 的人massiv,这个Comp参数是一种计算策略,值得注意的构造函数是:
Seq- 按顺序运行计算,无需分叉任何线程
Par- 启动尽可能多的线程并使用它们来完成工作。
我mwc-random最初将使用包作为示例,然后移至RVarT:
λ> import Data.Massiv.Array
λ> import System.Random.MWC (createSystemRandom, uniformR)
λ> import System.Random.MWC.Distributions (standard)
λ> gens <- initWorkerStates Par (\_ -> createSystemRandom)
上面我们使用系统随机性为每个线程初始化了一个单独的生成器,但是我们也可以通过从WorkerId参数中派生一个唯一的每个线程种子来使用它,这只是Int工作人员的索引。现在我们可以使用这些生成器创建一个具有随机值的数组:
λ> randomArrayWS gens (Sz2 2 3) standard :: IO (Array P Ix2 Double)
Array P Par (Sz (2 :. 3))
[ [ -0.9066144845415213, 0.5264323240310042, -1.320943607597422 ]
, [ -0.6837929005619592, -0.3041255565826211, 6.53353089112833e-2 ]
]
通过使用Par策略,scheduler库将平均分配可用工作人员之间的生成工作,每个工作人员将使用它自己的生成器,从而使其线程安全。没有什么能阻止我们重复使用相同的WorkerStates任意次数,只要它不是同时完成的,否则会导致异常:
λ> randomArrayWS gens (Sz1 10) (uniformR (0, 9)) :: IO (Array P Ix1 Int)
Array P Par (Sz1 10)
[ 3, 6, 1, 2, 1, 7, 6, 0, 8, 8 ]
现在放在mwc-random一边,我们可以通过使用以下函数将相同的概念重用于其他可能的用例generateArrayWS:
generateArrayWS ::
(Mutable r ix e, MonadUnliftIO m, PrimMonad m)
=> WorkerStates s
-> Sz ix -- ^ size of new array
-> (ix -> s -> m e) -- ^ element generating action
-> m (Array r ix e)
和mapWS:
mapWS ::
(Source r' ix a, Mutable r ix b, MonadUnliftIO m, PrimMonad m)
=> WorkerStates s
-> (a -> s -> m b) -- ^ Mapping action
-> Array r' ix a -- ^ Source array
-> m (Array r ix b)
这是关于如何将此功能与rvar,random-fu和mersenne-random-pure64库一起使用的承诺示例。我们也可以在randomArrayWS这里使用,但是为了举例,假设我们已经有一个具有不同RVarTs 的数组,在这种情况下我们需要一个mapWS:
λ> import Data.Massiv.Array
λ> import Control.Scheduler (WorkerId(..), initWorkerStates)
λ> import Data.IORef
λ> import System.Random.Mersenne.Pure64 as MT
λ> import Data.RVar as RVar
λ> import Data.Random as Fu
λ> rvarArray = makeArrayR D Par (Sz2 3 9) (\ (i :. j) -> Fu.uniformT i j)
λ> mtState <- initWorkerStates Par (newIORef . MT.pureMT . fromIntegral . getWorkerId)
λ> mapWS mtState RVar.runRVarT rvarArray :: IO (Array P Ix2 Int)
Array P Par (Sz (3 :. 9))
[ [ 0, 1, 2, 2, 2, 4, 5, 0, 3 ]
, [ 1, 1, 1, 2, 3, 2, 6, 6, 2 ]
, [ 0, 1, 2, 3, 4, 4, 6, 7, 7 ]
]
需要注意的是,尽管在上面的示例中使用了 Mersenne Twister 的纯实现,但我们无法逃避 IO。这是因为非确定性调度,这意味着我们永远不知道哪个工作人员将处理数组的哪个块,因此哪个生成器将用于数组的哪个部分。从好的方面来说,如果生成器是纯且可拆分的,例如splitmix,那么我们可以使用纯的、确定性和可并行化的生成函数:randomArray,但这已经是另一回事了。