7

我正在使用 STM 做事,并且在使用TBQueue数据结构方面取得了巨大成功。我一直在使用它的一个有用功能是根据 a 中的先决条件读取它TVar,基本上像这样:

shouldRead <- readTVar shouldReadVar
if shouldRead
  then do
    a <- readTBQueue queue
    doSomethingWith a
  else doSomethingElse

如果我们在执行这个块之前假设它queue是空的并且shouldReadVar包含True,它将导致readTBQueue调用,并且当包含或包含一个元素retry时,该块将被重新执行,无论先发生什么。shouldReadVarFalsequeue


我现在需要一个同步通道数据结构,类似于本文中描述的结构(如果你想理解这个问题,请阅读它),除了它需要像上一个例子那样带有前置条件,它是可读的,并可能与其他东西一起组成。

SyncChan让我们称其为定义了操作writeSyncChan的数据结构readSyncChan

这是一个可能的用例:这个(伪)代码(因为我混合了 STM/IO 概念而无法工作):

shouldRead <- readTVar shouldReadVar
if shouldRead
  then do
    a <- readSyncChan syncChan
    doSomethingWith a
  else doSomethingElse

假设当前没有其他线程阻塞writeSyncChan调用,并且shouldReadChancontains True,我希望块为“ ” ,retry直到shouldReadChancontains不同的线程阻塞在. 换句话说:当一个线程打开而另一个线程阻塞到达 a时,或者反之亦然,我希望该值沿通道传输。在所有其他情况下,双方都应该处于一种状态,从而对 的变化做出反应,从而可以取消读取或写入。FalsewriteSyncChanretrywriteSyncChanreadSyncChanretryshouldReadVar

T上面链接的文章中描述的使用两个 ( )的天真的方法MVar当然是不可能的。因为数据结构是同步的,所以不可能在两个atomically块内使用它,因为您不能在原子上下文中更改一个TMVar并等待另一个更改。TMVar

相反,我正在寻找一种部分原子性,在这种情况下,我可以“提交”事务的某个部分,并且只有在某些变量发生变化时才将其回滚,而其他变量则不能。如果我有像上面文章中的第一个示例一样的“msg”和“ack”变量,我希望能够写入“msg”变量,然后等待一个值到达“ack”,或者等待我的其他交易变量要改变。如果其他事务变量发生变化,则应重试整个原子块,并且如果“ack”值到达,则事务应像之前的状态一样继续。对于阅读方面,应该会发生类似的事情,除了我当然会从“msg”读取并写入“ack”。

这可以使用 GHC STM 来实现,还是我需要手动进行 MVar/回滚处理?

4

1 回答 1

3

这就是你想要的:

import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad

data SyncChan a = SyncChan (TMVar a) (TMVar ())

newSyncChan :: IO (SyncChan a)
newSyncChan = do
    msg <- newEmptyTMVarIO
    ack <- newEmptyTMVarIO
    return (SyncChan msg ack)

readIf :: SyncChan a -> TVar Bool -> STM (Maybe a)
readIf (SyncChan msg ack) shouldReadVar = do
    b <- readTVar shouldReadVar
    if b
        then do
            a <- takeTMVar msg
            putTMVar ack ()
            return (Just a)
        else return Nothing

write :: SyncChan a -> a -> IO ()
write (SyncChan msg ack) a = do
    atomically $ putTMVar msg a
    atomically $ takeTMVar ack

main = do
    sc <- newSyncChan
    tv <- newTVarIO True
    forkIO $ forever $ forM_ [False, True] $ \b -> do
        threadDelay 2000000
        atomically $ writeTVar tv b
    forkIO $ forM_ [0..] $ \i -> do
        putStrLn "Writing..."
        write sc i
        putStrLn "Write Complete"
        threadDelay 300000
    forever $ do
        putStrLn "Reading..."
        a <- atomically $ readIf sc tv
        print a
        putStrLn "Read Complete"

这给出了您想到的行为。而输入TVarTrue和输出端将相互同步。当TVar切换到时False,读取端自由中止并返回Nothing

于 2013-06-13T22:05:53.337 回答