我正在向这个 Haskell 服务器发送简单的 UDP 数据包。对于数据包的来源,我使用“aspell -l en dump master”生成的纯文本文件。但是,任何超过 120,000 条消息的列表都应该有效。如果我同时启动消费者和生产者,我不会丢失数据包。但是,我希望能够模拟一个非常忙碌的消费者。如果我在启动消费者之前引入一个 threadDelay 20 秒,我会丢失数据包。这对我来说是反直觉的,因为当我延迟消费时,我在标准输出和磁盘 IO 方面做得更少。谁能解释为什么我会因为延迟版本而感到迷茫?当我的消费者非常忙碌时,如何更好地管理套接字和 TChan 以不造成任何损失(只是更高的内存使用率)?
import Control.Monad (forever)
import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.STM (writeTChan, readTChan, atomically)
import Control.Concurrent.STM.TChan
import Network.Socket hiding (send, sendTo, recv, recvFrom)
import Network.Socket.ByteString
import Data.ByteString hiding(putStrLn, head)
import qualified Data.ByteString.Char8 as Char8 (putStrLn, putStr)
import System.IO
main :: IO ()
main = withSocketsDo $ do
hSetBuffering stdout NoBuffering
addrinfos <- getAddrInfo
(Just (defaultHints {addrFlags = [AI_PASSIVE]}))
Nothing (Just "2000")
let serveraddr = head addrinfos
sock <- socket (addrFamily serveraddr) Datagram defaultProtocol
bindSocket sock (addrAddress serveraddr)
chan <- newTChanIO
forkIO(producer chan sock)
-- Uncomment the threadDelay below to see lossy version
-- threadDelay (1000000 * 20)
forkIO(consumer chan)
forever $ threadDelay (1000000 * 60)
producer :: TChan ByteString -> Socket -> IO ()
producer chan sock = forever $ do
(msg) <- recv sock 256
atomically $ writeTChan chan msg
consumer :: TChan ByteString -> IO ()
consumer chan = forever $ do
msg <- atomically $ readTChan chan
Char8.putStr msg