10

我有一个 TChan 作为线程的输入,它的行为应该是这样的:

如果 sombody 在特定时间内写入 TChan,则应检索内容。如果在指定时间内没有写入任何内容,则应解除阻塞并继续Nothing

我对此的尝试是使用这样的超时功能System.Timeout

timeout 1000000 $ atomically $ readTChan pktChannel

这似乎有效,但现在我发现,我有时会丢失数据包(它们被写入通道,但在另一侧没有读取。在日志中我得到这个:

2014.063.11.53.43.588365 Pushing Recorded Packet: 2 1439
2014.063.11.53.43.592319 Run into timeout
2014.063.11.53.44.593396 Run into timeout
2014.063.11.53.44.593553 Pushing Recorded Packet: 3 1439
2014.063.11.53.44.597177 Sending Recorded Packet: 3 1439

其中“Pushing Recorded Packet”是来自一个线程的写入,“Sending Recorded Packet”是从发送者线程中的 TChan 读取。缺少带有 的行Sending Recorded Packet 2 1439,这表明从 TChan 读取成功。

似乎如果在错误的时间点收到超时,通道就会丢失数据包。我怀疑threadKill里面使用的功能timeout和STM一起玩不好。

这个对吗?有人有另一种不会丢失数据包的解决方案吗?

4

3 回答 3

8

使用registerDelaySTM 函数,在TVar达到超时时向 a 发出信号。然后,您可以使用orElse函数或Alternative运算符<|>在下一个TChan值或超时之间进行选择。

import Control.Applicative
import Control.Monad
import Control.Concurrent
import Control.Concurrent.STM
import System.Random

-- write random values after a random delay
packetWriter :: Int -> TChan Int -> IO ()
packetWriter maxDelay chan = do
  let xs = randomRs (10000 :: Int, maxDelay + 50000) (mkStdGen 24036583)
  forM_ xs $ \ x -> do
    threadDelay x
    atomically $ writeTChan chan x

-- block (retry) until the delay TVar is set to True
fini :: TVar Bool -> STM ()
fini = check <=< readTVar

-- Read the next value from a TChan or timeout
readTChanTimeout :: Int -> TChan a -> IO (Maybe a)
readTChanTimeout timeoutAfter pktChannel = do
  delay <- registerDelay timeoutAfter
  atomically $
        Just <$> readTChan pktChannel
    <|> Nothing <$ fini delay

-- | Print packets until a timeout is reached
readLoop :: Show a => Int -> TChan a -> IO ()
readLoop timeoutAfter pktChannel = do
  res <- readTChanTimeout timeoutAfter pktChannel
  case res of
    Nothing -> putStrLn "timeout"
    Just val -> do
      putStrLn $ "packet: " ++ show val
      readLoop timeoutAfter pktChannel

main :: IO ()
main = do
  let timeoutAfter = 1000000

  -- spin up a packet writer simulation
  pktChannel <- newTChanIO
  tid <- forkIO $ packetWriter timeoutAfter pktChannel

  readLoop timeoutAfter pktChannel

  killThread tid
于 2014-03-04T14:54:18.187 回答
2

并发的经验法则是:如果在 IO 操作中的某个点添加睡眠很重要,那么您的程序是不安全的。

要了解代码为什么timeout 1000000 $ atomically $ readTChan pktChannel不起作用,请考虑以下替代实现atomically

atomically' :: STM a -> IO a
atomically' action = do
  result <- atomically action
  threadDelay someTimeAmount
  return result

以上等于atomically,但是为了额外的无辜延迟。现在很容易看出,如果timeout在 期间杀死线程threadDelay,则原子操作已完成(从通道消耗消息),但timeout将返回Nothing

一个简单的修复timeout n $ atomically ...可能是以下

smartTimeout :: Int -> STM a -> IO (Maybe a)
smartTimeout n action = do
   v <- atomically $ newEmptyTMvar
   _ <- timeout n $ atomically $ do
          result <- action
          putTMvar v result
   atomically $ tryTakeTMvar v

上面使用了一个额外的事务变量v来做到这一点。动作的结果值存储v 运行动作的同一原子块内。timeout 的返回值是不可信的,因为它不会告诉我们动作是否运行。之后,我们检查 TMVar v,当且仅当运行时它才会满action

于 2014-03-04T20:17:15.850 回答
1

而不是TChan a,使用TChan (Maybe a). 你的正常生产者(的x)现在写Just x。分叉一个额外的“滴答”进程,写入Nothing通道(每 x 秒)。然后为频道设置一个阅读器,如果连续获得两个 则中止Nothing。这样,您可以避免异常,这可能会导致数据在您的情况下丢失(但我不确定)。

于 2014-03-04T14:43:27.433 回答