6

我正在向这个 Haskell 服务器发送简单的 UDP 数据包。对于数据包的来源,我使用“aspell -l en dump master”生成的纯文本文件。但是,任何超过 120,000 条消息的列表都应该有效。如果我同时启动消费者和生产者,我不会丢失数据包。但是,我希望能够模拟一个非常忙碌的消费者。如果我在启动消费者之前引入一个 threadDelay 20 秒,我会丢失数据包。这对我来说是反直觉的,因为当我延迟消费时,我在标准输出和磁盘 IO 方面做得更少。谁能解释为什么我会因为延迟版本而感到迷茫?当我的消费者非常忙碌时,如何更好地管理套接字和 TChan 以不造成任何损失(只是更高的内存使用率)?

import Control.Monad (forever)
import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.STM (writeTChan, readTChan, atomically)
import Control.Concurrent.STM.TChan
import Network.Socket hiding (send, sendTo, recv, recvFrom)
import Network.Socket.ByteString
import Data.ByteString hiding(putStrLn, head)
import qualified Data.ByteString.Char8 as Char8 (putStrLn, putStr)
import System.IO

main :: IO ()
main = withSocketsDo $  do
    hSetBuffering stdout NoBuffering
    addrinfos <- getAddrInfo
                 (Just (defaultHints {addrFlags = [AI_PASSIVE]}))
                 Nothing (Just "2000")
    let serveraddr = head addrinfos
    sock <- socket (addrFamily serveraddr) Datagram defaultProtocol
    bindSocket sock (addrAddress serveraddr)
    chan <- newTChanIO
    forkIO(producer chan sock)
    -- Uncomment the threadDelay below to see lossy version
    -- threadDelay (1000000 * 20)
    forkIO(consumer chan)
    forever $ threadDelay (1000000 * 60)

producer :: TChan ByteString -> Socket -> IO ()
producer chan sock = forever $ do
    (msg) <- recv sock 256
    atomically $ writeTChan chan msg

consumer :: TChan ByteString -> IO ()
consumer chan = forever $ do
    msg <- atomically $ readTChan chan
    Char8.putStr msg
4

1 回答 1

1

为什么您不会期望数据丢失?这在任何语言中都会发生,不仅仅是 Haskell,因为内核对于每个套接字只有有限的缓冲区空间。因为与 TCP 不同,UDP 不受流量控制,内核只会丢弃数据包。您当然可以使用 SO_RCVBUF 套接字选项增加缓冲区大小。但从根本上说,UDP 无论如何都不可靠,因此任何使用它的实际应用程序都需要处理数据包丢失。

于 2015-08-02T23:46:43.150 回答