2

我有一个写入 Map 和 PSQ 的主线程。在 Map 和 PSQ 中,我使用相同的键,因此通过查看 PSQ,可以找到具有最小优先级的条目,复杂度为 O(1),并映射到 Map 中的值。

现在,虽然我的主线程在需要时添加/修改了 Map 和 PSQ,但我有第二个线程不断(forever $ do)查看 PSQ 以确定最旧的密钥何时是 N 毫秒前,然后应该刷新它。

为此,两个线程都需要查看相同的可变数据。这里保持状态的最佳方法是什么?这会是 IOREfs 的情况吗?还有什么可能的方法来解决这个问题?

“一些”预阿尔法代码在这里:

import Data.Time
import Data.Functor
import Data.Time.Clock.POSIX
import qualified Data.PSQueue as PSQ
import qualified Data.Map as Map
import Data.Maybe
import Control.Concurrent
import Control.Concurrent.MVar
import Control.Monad
import Network.Socket hiding (send, sendTo, recv, recvFrom)
import Network.Socket.ByteString
import qualified Data.ByteString.Char8 as B 

--PSQ = (host, PID) POSIXTime
--where the tuple is k and POSIXTime is p

--Map is (host, PortNumber) [messages]
--where the tuple is the key and [messages] is a list of messages

key = ("192.168.1.1", 4711)
messages = ["aaa", "bbbb", "ccccc"]

newRq :: IO ((PSQ.PSQ (String, Integer) POSIXTime), (Map.Map (String, Integer) [String]))
newRq = do
      time <- getPOSIXTime
      let q = PSQ.singleton key time
      let m = Map.singleton key messages
      return (q, m)

appendMsg :: String -> (String, Integer) -> Map.Map (String, Integer) [String] -> Map.Map (String, Integer) [String]
appendMsg newmsgs (host, port) m =
      let Just messages' = Map.lookup (host,port) m
          l = length . concat $ messages'
          l' = l + length newmsgs
      in 
      if l' < 1400 then Map.adjust (++ [newmsgs]) (host, port) m else m

insertNewRec :: (String, Integer) -> [String] -> PSQ.PSQ (String, Integer) POSIXTime -> Map.Map (String, Integer) [String] -> IO ((PSQ.PSQ (String, Integer) POSIXTime), (Map.Map (String, Integer) [String]))
insertNewRec (a,b) c q m = do
      time <- getPOSIXTime
      let q1 = PSQ.insert (a,b) time q
      let m1 = Map.insert (a,b) c m
      return (q1, m1)

sendq :: Socket -> B.ByteString -> String -> PortNumber -> IO ()
sendq s datastring host port = do
      hostAddr <- inet_addr host
      sendAllTo s datastring (SockAddrInet port hostAddr)
      return ()

deleteRec :: (String, Integer) -> PSQ.PSQ (String, Integer) POSIXTime -> Map.Map (String, Integer) [String] -> ((PSQ.PSQ (String, Integer) POSIXTime), (Map.Map (String, Integer) [String]))
deleteRec (host, port) q m = (q', m')
      where 
         m' = Map.delete (host, port) m
         q' = PSQ.delete (host, port) q

loopMyQ q m1 done = forever $ do 
      let Just m = PSQ.findMin q
      let time = (PSQ.prio m) + 0.200 --adds 200ms
      now <- getPOSIXTime
      if now < time
        then print (m1) 
        --here eventually I would call the send function to flush the queue
        else putMVar done ()

sendrecv :: Socket -> PSQ.PSQ (String, Integer) POSIXTime -> Map.Map (String, Integer) [String] -> String -> IO ((PSQ.PSQ (String, Integer) POSIXTime), (Map.Map (String, Integer) [String])) 
sendrecv s q1 m1 msg = do
     let m2 = appendMsg msg key m1
         (q3, m3) = case m2 of   
                   val | m2 == m1 -> deleteRec key q1 m1
                       | otherwise -> (q1, m2)
     (q5, m5) <- if (m2 == m1) then (do (q4, m4) <- insertNewRec key (words msg) q3 m3
                                        return (q4, m4)) else return (q1, m2)
     when (m2 == m1) (let Just messages = Map.lookup ("192.168.1.1", 4711) m1 in sendq s (B.pack $ unwords messages) "192.168.1.1" 4711)
     return (q5, m5)

--main :: IO()
main = withSocketsDo $ do
     s <- socket AF_INET Datagram defaultProtocol
     (q1, m1) <- newRq
     done <- newEmptyMVar
     forkIO $ loopMyQ q1 m1 done
     (q', m') <- foldM (\(q, m) _ -> sendrecv s q m "ping") (q1, m1) [1..1000]
     takeMVar done
     --print ("longer than 200ms ago")
4

1 回答 1

6

您很可能希望使用MVarTVar来跨线程保持一致的状态。IORefs 不是线程安全的。

我建议使用 STM(和 TVars)来解决这个问题。您正在处理对多个数据结构的并发访问,而 STM 的可组合性比考虑使用 MVar 的锁定顺序要容易得多。

查看您的代码后,似乎 TVar 将是您最好的选择。将您的 PSQ 和 Map 包装在两个不同的 TVar 中。atomically在事务中包装所有需要一致视图的代码。在大多数情况下,您的代码将“正常工作”。但是,如果存在对锁的争用,原子块将被重试,直到它工作为止。

于 2012-02-20T22:17:17.643 回答