3

我有以下并发线程管理器的实现

newtype Query = Query String

type ThreadWorker = (Query, ThreadStatus)
data ThreadStatus = Running | Finished | Threw IOException

newtype ThreadManager = Manager (MVar (M.Map ThreadId (MVar ThreadWorker)

我想写manageWorkers :: ThreadManager -> IO ()遍历并Map查看ThreadStatus.ThreadWorker

如果ThreadWorker完成,则将其从ThreadManager. 如果抛出了异常,那么应该处理它(打印到 stdout 就可以了)并且应该派生一个新线程来处理查询(假设存在一个函数runQuery :: Query -> IO a)并添加到ThreadManager,否则线程仍在运行,应该不理会。

我的第一次尝试是:

manageWorkers :: ThreadManager -> IO ()
manageWorkers (Manager mgr) =
    modifyMVar mgr $ \m -> do
        m' <- M.traverseWithKey manageWorker m
        return (m', ())
where manageWorker :: ThreadId -> MVar ThreadWorker -> IO (MVar ThreadWorker)
      manageWorker tid wkr = tryTakeMVar wkr >>= \mwkr ->
          case mwkr of
               Just (_, Finished) -> undefined -- need to delete this finished ThreadWorker
               Just (q, Threw e ) -> do
                   putStrLn ("[ERROR] " ++ show e)
                   tid' <- forkIO $ runQuery q
                   undefined -- need to add new ThreadWorker
               Just r -> newMVar r
               _ -> newEmptyMVar

但后来我被卡住了,似乎无法从/添加ThreadManagermanageWorker. 我不确定是否可以从traverse-like 函数中执行我想要的操作。

manageWorkers是否可以使用 my实现此功能,ThreadManager或者那里有更好的抽象?

编辑:在 ThomasM.DuBuisson 建议使用折叠时,我现在有以下内容

manageWorkers (Manager mgr) =
    modifyMVar mgr $ \m ->
    return (M.foldrWithKey manageWorker M.empty m, ())
where manageWorker :: ThreadId -> MVar ThreadWorker -> M.Map ThreadId (MVar ThreadWorker)
                      -> IO (M.Map ThreadId (MVar ThreadWorker))
      manageWorker tid wkr ts = tryTakeMVar wkr >>= \mwkr ->
          case mwkr of
              Just (q, Threw e) -> do
                  putStrLn ("[ERROR] " ++ show e)
                  wkr' <- newEmptyMVar
                  tid' <- forkIO $ runQuery q
                  return $ M.insert tid' wkr' ts
              Just (_, Running) -> return $ M.insert tid wkr
              _ -> return ts

唯一的问题是,显然manageWorker's 签名不能用M.foldrWithKey。我需要一个M.foldrWithKeyM :: Monad m => (k -> a -> b -> m b) -> b -> M.Map k a -> m b但这样的东西不存在,而且我自己编写它时遇到了麻烦。

显然,我可以unsafePerformIO用来逃避 IO monad 并满足编译器的要求,但我只会将其用作最后的手段。在这种情况下使用是否有意义unsafePerformIO

4

1 回答 1

1

在我看来,您的问题与并发无关。
您只想遍历一个映射,同时从映射中删除一些键或在映射中插入一些键并执行一些IO操作。
问题是traverseWithKey无法从映射中删除键或在映射中插入键,foldrWithKey无法执行IO操作。
你需要一个M.foldrWithKeyM :: Monad m => (k -> a -> b -> m b) -> b -> M.Map k a -> m b
的确,这样的事情是不存在的。但是,如果您查看 的文档foldrWithKey,其中指出:

foldrWithKey f z == foldr (uncurry f) z . toAscList. 

如果我们用 复制,我们可以猜测这样M.foldrWithKeyM可以组成。 以下是我的解决方案。 foldrfoldM

manageWorkers :: ThreadManager -> IO ()
manageWorkers (Manager mgr) = 
    modifyMVar mgr $ \m -> do
        m' <- foldM manageWorker m (M.toList m)
        return (m', ())
  where manageWorker :: M.Map ThreadId (MVar ThreadWorker) -> (ThreadId, MVar ThreadWorker) -> IO (M.Map ThreadId (MVar ThreadWorker))
        manageWorker ts (tid, wkr) = tryTakeMVar wkr >>= \mwkr ->
            case mwkr of
                 Just (_, Finished) -> return $ M.delete tid ts -- need to delete this finished ThreadWorker
                 Just (q, Threw e ) -> do
                       putStrLn ("[ERROR] " ++ show e)
                       wkr' <- newEmptyMVar
                       tid' <- forkIO $ runQuery q
                       return $ M.insert tid' wkr' ts -- need to add new ThreadWorker
                 _ -> return ts
于 2013-01-29T14:11:05.183 回答