4

我正在玩弄 Haskell 线程,并且遇到了跨通道传达惰性求值值的问题。例如,在 N 个工作线程和 1 个输出线程的情况下,工作人员交流未评估的工作,输出线程最终为他们完成工作。

我已经在各种文档中阅读了有关此问题的信息并查看了各种解决方案,但我只找到了一个有效的解决方案,而其余的则没有。下面是一些代码,其中工作线程开始一些可能需要很长时间的计算。我按降序启动线程,这样第一个线程应该花费最长的时间,后面的线程应该更早完成。

import Control.Concurrent (forkIO)
import Control.Concurrent.Chan   -- .Strict
import Control.Concurrent.MVar
import Control.Exception (finally, evaluate)
import Control.Monad (forM_)
import Control.Parallel.Strategies (using, rdeepseq)

main = (>>=) newChan $ (>>=) (newMVar []) . run

run :: Chan (Maybe String) -> MVar [MVar ()] -> IO ()
run logCh statVars = do
  logV <- spawn1 readWriteLoop
  say "START"
  forM_ [18,17..10] $ spawn . busyWork
  await
  writeChan logCh Nothing -- poison the logger
  takeMVar logV
  putStrLn "DONE"
  where
    say mesg = force mesg >>= writeChan logCh . Just

    force s = mapM evaluate s  -- works
--    force s = return $ s `using` rdeepseq  -- no difference
--    force s = return s -- no-op; try this with strict channel

    busyWork = say . show . sum . filter odd . enumFromTo 2 . embiggen
    embiggen i = i*i*i*i*i

    readWriteLoop = readChan logCh >>= writeReadLoop
    writeReadLoop Nothing = return ()
    writeReadLoop (Just mesg) = putStrLn mesg >> readWriteLoop

    spawn1 action = do
      v <- newEmptyMVar
      forkIO $ action `finally` putMVar v ()
      return v

    spawn action = do
      v <- spawn1 action
      modifyMVar statVars $ \vs -> return (v:vs, ())

    await = do
      vs <- modifyMVar statVars $ \vs -> return ([], vs)
      mapM_ takeMVar vs

使用大多数技术,结果按产生的顺序报告;也就是说,运行时间最长的计算在前。我将此解释为意味着输出线程正在完成所有工作:

-- results in order spawned (longest-running first = broken)
START
892616806655
503999185040
274877906943
144162977343
72313663743
34464808608
15479341055
6484436675
2499999999
DONE

我认为这个问题的答案将是严格的渠道,但他们没有奏效。我知道字符串的 WHNF 是不够的,因为这只会强制最外层的构造函数(字符串的第一个字符为 nil 或 cons)。应该完全评估rdeepseq,但它没有区别。我发现唯一可行的方法是映射Control.Exception.evaluate :: a -> IO a字符串中的所有字符。(请参阅force代码中的函数注释以了解几种不同的替代方案。)结果Control.Exception.evaluate如下:

-- results in order finished (shortest-running first = correct)
START
2499999999
6484436675
15479341055
34464808608
72313663743
144162977343
274877906943
503999185040
892616806655
DONE

那么为什么不严格渠道或rdeepseq产生这种结果呢?还有其他技术吗?我是否误解了为什么第一个结果被破坏了?

4

1 回答 1

5

这里有两个问题。

第一次尝试(使用显式rnf)不起作用的原因是,通过使用return,您创建了一个 thunk,它在评估时完全评估自身,但thunk 本身没有被评估。请注意,评估的类型是a -> IO a:它返回一个值的事实IO意味着evaluate可以强加排序:

return (error "foo")   >> return 1 == return 1
evaluate (error "foo") >> return 1 == error "foo"

结果是这段代码:

force s = evaluate $ s `using` rdeepseq

将起作用(如,具有与 相同的行为mapM_ evaluate s)。


使用严格通道的情况有点棘手,但我相信这是由于严格并发中的一个错误。昂贵的计算实际上是在工作线程上运行的,但它对你没有多大好处(你可以通过在你的字符串中隐藏一些异步异常并查看异常出现在哪个线程上来明确地检查这一点)。

什么是错误?让我们看一下 strict 的代码writeChan

writeChan :: NFData a => Chan a -> a -> IO ()
writeChan (Chan _read write) val = do
  new_hole <- newEmptyMVar
  modifyMVar_ write $ \old_hole -> do
    putMVar old_hole $! ChItem val new_hole
    return new_hole

我们看到在评估 thunk 之前modifyMVar_调用了它。write那么操作顺序是:

  1. writeChan被输入
  2. 我们takeMVar write(阻止任何想写信给频道的人)
  3. 我们评估昂贵的 thunk
  4. 我们将昂贵的 thunk 放到通道上
  5. 我们putMVar write,解除所有其他线程的阻塞

您看不到evaluate变体的这种行为,因为它们在获取锁之前执行评估。

我会就此向 Don 发送邮件,看看他是否同意这种行为不是最理想的。

唐同意这种行为是次优的。我们正在开发一个补丁。

于 2011-03-06T17:38:28.087 回答