0

我目前正在玩 Polysemy,重写我的一个小玩具项目以适应它。我偶然发现了一段使用 的代码pooledMapConcurrentlyN,因此基本上是具有有限并发的并行版本。

我可以将我的示例简化为:

foo :: Sem r Int
foo = do
  res <- pooledMapConcurrentlyN 3 action (["foo", "bar", "baz"] :: [String])
  pure $ sum res

action :: String -> Sem r Int 
action = pure. length

这不会编译,因为没有MonadUnliftIO (Sem r). 它在我使用时会编译traverse,但我正在寻找一个并发版本。我不确定我现在应该走哪条路。

我看到以下选项:

  • 实现一个MonadUnliftIO (Sem r)实例。我看到在这个 GitHub issue中有一些关于添加/实现这样一个实例的讨论。但是,我不清楚这样做是否是个好主意。
  • 使用除此之外的东西pooledMapConcurrentlyN会给我一个等效的行为。我知道有parTraverse来自par-dual包,但这需要一个ParDual实例。该parallel软件包也可以使解决方案成为可能,但是我对此并不熟悉,因此无法确定是否可行。
  • 将平行遍历建模为效果。我试过了,但我无法实现效果。我试过的效果定义是这样的:
data ParTraverse m a where
  TraverseP :: (Traversable t) => Int -> (a -> m b) -> t a -> ParTraverse m (t b)

我对 GADT 和 Polysemy 都不是很熟悉,所以我可能在这里遗漏了一些明显的东西。


编辑:正如下面的答案所指出的,最合适的解决方案是将其建模为效果并在效果解释中处理并发,而不是在业务逻辑中处理。这意味着我正在寻找类似于上述ParTraverse效果的更高阶效果(?):

data ParTraverse m a where
  TraverseP :: (Traversable t) => (a -> m b) -> t a -> ParTraverse m (t b)

makeSem ''ParTraverse

parTraverseToIO :: (Member (Embed IO) r) => Sem (ParTraverse ': r) a -> Sem r a
parTraverseToIO = interpretH $ \case
  TraverseP f ta -> do
    _something

我不确定这种类型签名是否正确(动作应该有 typea -> Sem r b吗?for 的签名traverse有一个Applicative约束m,我将如何建模?)

4

2 回答 2

3

至于ParTraverse实现,这是我在 github 上回复的,专门用于 for 的[]版本t

pooledMapConcurrently :: Member (Final IO) r => Int -> (a -> Sem r b) -> [a] -> Sem r [Maybe b]
pooledMapConcurrently num f ta =
  ...

data ParTraverse m a where
  TraverseP :: (a -> m b) -> [a] -> ParTraverse m [b]

makeSem ''ParTraverse

parTraverseToIO :: (Member (Final IO) r) => InterpreterFor ParTraverse r
parTraverseToIO =
  interpretH \case
   TraverseP f ta -> do
     taT <- traverse pureT ta
     fT <- bindT f
     tb <- raise (parTraverseToIO (pooledMapConcurrently 1 fT taT))
     ins <- getInspectorT
     pureT (catMaybes (inspect ins <$> catMaybes tb))

interpretH对于我们在环境中操作的内部使用的组合器的一些解释Tactical

  • 由于我们正在处理一个函数a -> m b,其中在解释器内部m被实例化Sem rInitial,我们必须使用它bindT来获取一个类似于 的函数,它是f a -> Sem r (f b)解释f器的一元状态。
  • 我们不能直接pooledMapConcurrently在 上运行Sem rInitial,因为Member (Final IO)只给出了 for r
  • ta包含 for 的输入f,但由于我们将其提升为 expect f a,因此我们还必须调用pureT, 的每个元素tatraverse因为它是一个单子动作。
  • bindT(and runT) 产生的函数在头部产生Sem仍然具有当前效果的 s ParTraverse,因为效果必须在包装内解释Sem(传入 as a -> m b)。这甚至允许为内部程序使用不同的解释器。在我们的例子中,我们只是再次运行parTraverseToIO结果f。之后,我们必须将它提升SemTactical环境中(这只是头部的另一个效果),所以我们使用raise.
  • 由于我们的提升f产生f (Maybe b)了结果,我们需要解压它以获得正确的返回类型。为此,我们可以使用检查器,它转换fMaybe,给我们Maybe (Maybe b),然后我们可以将其展平为一个列表。

为了完整起见,这里是pooledMapConcurrently由 KingoftheHomeless 编写的 的实现:

pooledMapConcurrently :: (Member (Final IO) r, Traversable t) => Int -> (a -> Sem r b) -> t a -> Sem r (t (Maybe b))
pooledMapConcurrently i f t = withWeavingToFinal $ \s wv ins ->
  (<$ s) <$> pooledMapConcurrentlyIO i (\a -> ins <$> wv (f a <$ s)) t
于 2020-10-15T20:51:00.903 回答
1

然后我会尝试:

  1. 不要将并发作为业务逻辑的影响
  2. 在你的解释器中使用pooledMapconcurrentlyIO + embed

所以你会有这样的东西

data GetThings m a where
  GetThings :: [InfoToFetchThing] -> GetThings m [Thing]

runGetThingsConcurrently :: Member (Embed IO) r => Sem (GetThings ': r) a -> Sem r a
runGetThingsConcurrently = interpret \case
  GetThings infos -> do
  ...
  embed $ pooledMapConcurrentlyIO 42 <fetch-action> infos

当然,您也可以对其进行大量自定义 -Traversable使用列表而不是列表,<fetch-action>作为参数传递给解释器,将您想要的线程数作为参数传递给解释器,等等。

编辑:由于要执行的操作也需要在 Sem r 中,而不是在 IO 中,因此您可以使用withWeavingToFinal(可能)从 Sem r 中获取 IO,如链接中所述。

于 2020-10-13T18:24:57.440 回答