我正在寻找一种 Haskell 设计,以某种方式组成一串单子动作(通常是 IO),以后的动作依赖于以前的动作,但在某些情况下可以在它们完成之前执行。


type Future m a = m (m a)


所以在某些链中a >>= b >>= cb 得到一个返回 a 的结果的动作。如果 b 评估此操作,它会等待 a 完成,否则它将并行运行。这也意味着,如果某个动作不需要前一个动作的结果作为参数,那么根据定义它不依赖于它,因此依赖关系是显式的。


date :: Future IO String   -- long process to find out the date
date = do
    print "attempting to get date"  -- will usually start some thread or process to compute the date
    return (print "today")  -- will wait for this thread or process and return the computed date

main = do
    d <- date   -- starts recieving the date
    print "foo" -- some other process
    d >>= print -- waits until the date has been computed and prints it out


"attempting to get date"

存在一个问题:如果一个动作决定等待前一个动作,它将始终依赖于之前的所有其他动作(在我的情况下)。但是在上面的例子中,如果 c 决定等待 b 但 b 没有决定等待 a,c 可能会在 a 完成之前开始,这不应该发生。


(>=>) :: Monad m => Future m a -> (m a -> Future m b) -> Future m b
a >=> f = do
    r1 <- a
    r2 <- f r1
    return (r1 >> r2)

所以这将结合“等待动作”并且a >=> b >=> c工作得很好,如果 c 等待 b 这个等待动作也会等待 a。然而,这种方法还有另一个问题(除此之外,您需要记住使用 >=> 而不是 >>=):可能会多次评估等待操作。如果 b 等待 a 并且 c 等待 b,则等待 b 将连接到等待 a,因此等待 a 将被执行两次。

实际问题在于 >=>:f r1可能会评估 r1 在这种情况下,它不需要在 return 语句中使用 r2 进行排序(因为它已经被执行,因此 a 已经完成)。但它也可能不是,我不知道。





backup :: Future IO ExitCode
backup = do
    pid <- startProcess "backup"
    return (waitForProcessAndGetExitCode pid)

当我现在链接类似的操作时backup >=> otherAction,otherAction 可以在备份运行时运行(总体上节省了很多时间)。但是 otherAction 可能需要完成备份,这种情况下它可以使用它的参数等待备份并检查是否成功。无论哪种方式,都必须执行备份。

我现在正在寻找一个很好的通用解决方案,最好不要与 IO monad 绑定。



(>>=) :: Future m a -> (a -> Future m b) -> Future m b


import Control.Concurrent
import Control.Monad
import Control.Monad.Trans

newtype Future m a = Future { runFuture :: m (m a) }

instance (Monad m) => Monad (Future m) where
    return = Future . return . return
    m >>= f = Future $ do
        fut1 <- runFuture m
        return $ join $ join $ liftM (runFuture . f) fut1

instance MonadTrans Future where
    lift = Future . liftM return

换句话说,Future它是一个 monad 转换器,它的实现没有什么是专门针对IOmonad 的。但是,以下示例将展示如何将它与IOmonad 结合使用来链接期货:

parallel :: IO a -> Future IO a
parallel m = Future $ do
    v <- newEmptyMVar
    forkIO $ m >>= putMVar v
    return $ takeMVar v

future1 = parallel $ do
    threadDelay 1000000
    putStrLn "Hello, World" 
    return 1
future2 n = parallel $ do
    threadDelay 1000000
    print n
    return 2
future3 = future1 >>= future2

main = do
    f <- runFuture future3
    putStrLn "I'm waiting..."
    r <- f
    print r

我还没有证明它满足 monad 定律或 monad 转换器定律,但我会尝试做到这一点,我会告诉你它是否检查出来。在那之前,那里的某个地方可能会放错join地方。


share :: IO a -> IO (IO a)
share m = do
    ref <- newIORef Nothing
    let reader = do
          cached <- readIORef ref
          case cached of
            Just a -> return a
            Nothing -> m >>= \a -> writeIORef ref (Just a) >> return a
    return reader

您可以share2 :: IO a -> IO a通过将 IORef 创建包装在 中来将其更改为unsafePerformIO,并且很容易推广到任何MonadIO实例。


mma >=> fab = return $ do
    ma <- mma
    b  <- fab ma


mma >=> fab = do
    ma <- mma
    return $ do
        b <- fab ma
对于某些情况,当您需要按需执行操作时,您可能会发现此代码很有用 未在 GHC 中检查,但在拼写错误修复后应该可以工作

module Promise (SuspendedAction, createSuspendedAction, getValueFromSuspendedAction)
import Data.IORef

data Promise a = Suspended (IO a) | Done a

data SuspendedAction = SA (IORef (Promise a))

createSuspendedAction :: m a -> m (SuspendedAction a)
createSuspendedAction act = newIORef (Suspended act)

readSuspendedAction :: SuspendedAction a -> m a
readSuspendedAction (SA ref) = readIORef ref >>= \suspended -> case suspended of
  Done a -> return a
  Suspended sact -> sact >>= \rv -> writeIORef ref (Done rv) >> return rv

顺便说一句,仔细检查 hackage,有一个包允许在尊重其顺序的同时懒惰地执行 IO 操作。

a :: Process IO x ()
a = independant $ do
    print "start a"
    return $ print "end a"

b :: Process IO x Int
b = independant $ do
    print "start b"
    return $ print "end b" >> return 0

c :: Process IO Int ()
c = dependant $ \x -> do
    print $ "start c with " ++ show x
    return $  print ("end c, started with " ++ show x)

chain = a >~ b >~ c
main = exec chain

-- outputs:
"start a" "start b" "end a" "end b" "start c with 0" "end c, started with 0"



type Future m a = m (m a)
type Action m a b = a -> Future m b
type Process m a b = forall c. Action m c a -> Action m c b  -- will need -XRank2Types


-- sequences f after g, f is dependant of g and gets its result
-- dependant :: Monad m => Action m a b -> Action m c a -> Action c b
dependant :: Monad m => Action m a b -> Process m a b
dependant f g a = join (g a) >>= f

-- sequences f after g, f is independant of g
independant :: Monad m => Future m a -> Process m b a
independant f g a = do
    w1 <- g a
    w2 <- f
    return (w1 >> w2)

-- concenation of processes
(>~) = flip (.)


-- lifts a pure function into an action
pureA :: Monad m => (a -> b) -> Action m a b
pureA f a = return . return $ f a

-- makes an action wich always returns the same result
constA :: Monad m => b -> Action m a b
constA = pureA . const

-- no operation action
nop :: Monad m => Action m a ()
nop = constA ()

-- puts a sequence point
wait :: Monad m => Process m a a
wait = dependant $ pureA id

-- modify its result with a pure function
modify :: (Monad m, Functor m) => (a -> b) -> Process m a b
modify f act a = do
    x <- act a
    return (fmap f x)

-- makes a process, wich always returns the same result
constP :: (Monad m, Functor m) => b -> Process m a b
constP = modify . const


-- executes a process
exec :: Monad m => Process m () b -> m b
exec p = join $ p nop undefined


simleI :: String -> a -> Process IO b a
simpleI name r = independant $ do
    print ("start " ++ name)
    return $ print ("end " ++ name) >> return r

simpleD :: (Show a, Show b) => String -> (a -> b) -> Process IO a b
simpleD name f = dependant $ \a -> do
    print ("start " ++ name ++ " with " ++ show a)
    let r = f a
    return $ print ("end " ++ name ++ " with " ++ show r ++ " (started with " ++ show a ++ ")") >> return r

a = simpleI "a" ()
b = simpleI "b" 42
c = simpleD "c" (+1)
d = simpleI "d" ()

chain1 = a >~ b >~ c >~ d       -- == d . c . b . a
chain2 = a >~ wait >~ b >~ c >~ d
chain3 = a >~ b >~ modify (+1) >~ c >~ d

main = do
    exec chain1
    print "---"
    exec chain2
    print "---"
    exec chain3


"start a"
"start b"
"end a"
"end b"
"start c with 42"
"start d"
"end c with 43 (started with 42)"
"end d"
"start a"
"end a"
"start b"
"end b"
"start c with 42"
"start d"
"end c with 43 (started with 42)"
"end d"
"start a"
"start b"
"end a"
"end b"
"start c with 43"
"start d"
"end c with 44 (started with 43)"
"end d"


我有点好奇如何对 Action 和 Process 进行分类。它们不是单子。他们可能是箭,但我对箭太陌生,无法分辨。进程可能是一个带有 fmap = modify 和 pure = const 的 Applicative 。constA 或类似的东西。


