我有一个写入 Map 和 PSQ 的主线程。在 Map 和 PSQ 中,我使用相同的键,因此通过查看 PSQ,可以找到具有最小优先级的条目,复杂度为 O(1),并映射到 Map 中的值。
现在,虽然我的主线程在需要时添加/修改了 Map 和 PSQ,但我有第二个线程不断(forever $ do
)查看 PSQ 以确定最旧的密钥何时是 N 毫秒前,然后应该刷新它。
为此,两个线程都需要查看相同的可变数据。这里保持状态的最佳方法是什么?这会是 IOREfs 的情况吗?还有什么可能的方法来解决这个问题?
“一些”预阿尔法代码在这里:
import Data.Time
import Data.Functor
import Data.Time.Clock.POSIX
import qualified Data.PSQueue as PSQ
import qualified Data.Map as Map
import Data.Maybe
import Control.Concurrent
import Control.Concurrent.MVar
import Control.Monad
import Network.Socket hiding (send, sendTo, recv, recvFrom)
import Network.Socket.ByteString
import qualified Data.ByteString.Char8 as B
--PSQ = (host, PID) POSIXTime
--where the tuple is k and POSIXTime is p
--Map is (host, PortNumber) [messages]
--where the tuple is the key and [messages] is a list of messages
key = ("192.168.1.1", 4711)
messages = ["aaa", "bbbb", "ccccc"]
newRq :: IO ((PSQ.PSQ (String, Integer) POSIXTime), (Map.Map (String, Integer) [String]))
newRq = do
time <- getPOSIXTime
let q = PSQ.singleton key time
let m = Map.singleton key messages
return (q, m)
appendMsg :: String -> (String, Integer) -> Map.Map (String, Integer) [String] -> Map.Map (String, Integer) [String]
appendMsg newmsgs (host, port) m =
let Just messages' = Map.lookup (host,port) m
l = length . concat $ messages'
l' = l + length newmsgs
in
if l' < 1400 then Map.adjust (++ [newmsgs]) (host, port) m else m
insertNewRec :: (String, Integer) -> [String] -> PSQ.PSQ (String, Integer) POSIXTime -> Map.Map (String, Integer) [String] -> IO ((PSQ.PSQ (String, Integer) POSIXTime), (Map.Map (String, Integer) [String]))
insertNewRec (a,b) c q m = do
time <- getPOSIXTime
let q1 = PSQ.insert (a,b) time q
let m1 = Map.insert (a,b) c m
return (q1, m1)
sendq :: Socket -> B.ByteString -> String -> PortNumber -> IO ()
sendq s datastring host port = do
hostAddr <- inet_addr host
sendAllTo s datastring (SockAddrInet port hostAddr)
return ()
deleteRec :: (String, Integer) -> PSQ.PSQ (String, Integer) POSIXTime -> Map.Map (String, Integer) [String] -> ((PSQ.PSQ (String, Integer) POSIXTime), (Map.Map (String, Integer) [String]))
deleteRec (host, port) q m = (q', m')
where
m' = Map.delete (host, port) m
q' = PSQ.delete (host, port) q
loopMyQ q m1 done = forever $ do
let Just m = PSQ.findMin q
let time = (PSQ.prio m) + 0.200 --adds 200ms
now <- getPOSIXTime
if now < time
then print (m1)
--here eventually I would call the send function to flush the queue
else putMVar done ()
sendrecv :: Socket -> PSQ.PSQ (String, Integer) POSIXTime -> Map.Map (String, Integer) [String] -> String -> IO ((PSQ.PSQ (String, Integer) POSIXTime), (Map.Map (String, Integer) [String]))
sendrecv s q1 m1 msg = do
let m2 = appendMsg msg key m1
(q3, m3) = case m2 of
val | m2 == m1 -> deleteRec key q1 m1
| otherwise -> (q1, m2)
(q5, m5) <- if (m2 == m1) then (do (q4, m4) <- insertNewRec key (words msg) q3 m3
return (q4, m4)) else return (q1, m2)
when (m2 == m1) (let Just messages = Map.lookup ("192.168.1.1", 4711) m1 in sendq s (B.pack $ unwords messages) "192.168.1.1" 4711)
return (q5, m5)
--main :: IO()
main = withSocketsDo $ do
s <- socket AF_INET Datagram defaultProtocol
(q1, m1) <- newRq
done <- newEmptyMVar
forkIO $ loopMyQ q1 m1 done
(q', m') <- foldM (\(q, m) _ -> sendrecv s q m "ping") (q1, m1) [1..1000]
takeMVar done
--print ("longer than 200ms ago")