12

epoll我想使用“ ”风格的事件管理来实现高效的单线程套接字通信。

如果我要“从头开始”编写一个非常命令式的程序,我基本上会这样做(只是我刚刚输入的一些伪代码 - 可能无法编译):

import Control.Concurrent

import Data.ByteString (ByteString)
import qualified Data.ByteString as ByteString

import qualified GHC.Event as Event

import Network
import Network.Socket
import Network.Socket.ByteString

main = withSocketFromSomewhere $ \ socket -> do
  let fd = fromIntegral . fdSocket $ socket

  -- Some app logic
  state <- newMVar "Bla"

  -- Event manager
  manager <- Event.new

  -- Do an initial write
  initialWrite socket state manager

  -- Manager does its thing
  Event.loop manager

write manager socket bs =
  -- Should be pretty straight-forward
  Event.registerFd manager theWrite fd Event.evtWrite
  where
    fd = fromIntegral . fdSocket $ socket
    theWrite key _ = do
      Event.unregisterFd manager key
      sendAll socket bs

read manager socket cont =
  -- Ditto
  Event.registerFd manager theRead fd Event.evtRead
  where
    fd = fromIntegral . fdSocket $ socket
    theRead key _ = do
      Event.unregisterFd manager key
      bs <- recv socket 4096
      cont bs

initialWrite socket state manager = do
  msg <- readMVar state
  write manager socket msg
  read manager socket $ \ bs -> do
    ByteString.putStrLn bs
    putMVar state msg

想象一下,还有一些函数可以向管理器添加超时事件,等等。

但是,这段代码并不是特别好,原因如下:

  1. 我手动携带事件管理器。
  2. 我必须MVar为我的应用程序逻辑使用一个,因为我不能告诉不透明的事件管理器它应该为我传递一些状态,即使我知道它只使用一个线程,因此可能被用作基础一个单子变压器堆栈。
  3. 我必须为读取创建明确的分隔延续(我什至可能必须为写入执行此操作;我不知道在这种情况下什么会更明智)。

现在,这只是为使用过多的单子变压器等而尖叫。我希望能够做到这一点:

main =
  withSocketFromSomewhere $ \ socket ->
  runEvents . flip runStateT "Bla" $ initialWrite socket

initialWrite socket = do
  msg <- lift get
  write socket msg
  resp <- read socket
  liftIO $ ByteString.putStrLn resp
  lift $ put msg

此代码应具有与上述代码相同的行为;例如,通过暂停计算直到resp <- read socket在线接收到读取,并让我在同一个线程/管理器上管理多个套接字。

问题:

  1. GHC 事件 API/libevent/equivalent 是否有更高级的接口,可以为用户提供更多功能?考虑最近 GHC 中发生的同步 IO 调度更改是否值得(我在 7.4.1 上)?
  2. 如果我想实现协作并发,例如拥有一个始终处理来自套接字的读取的函数,但让该函数与写入“线程”共享相同的状态单子,该怎么办?这可以用(1)中的任何解决方案来完成吗?
4

1 回答 1

22

我强烈建议您阅读基于语言的统一事件和线程的方法。它讨论了如何在您选择的 IO 子系统之上构建您想要的任何并发系统,并且在他们的论文中,他们实际上在epoll.

不幸的是,论文中的数据类型和代码示例非常糟糕,并且花了一些时间(至少对我来说)对他们的代码进行逆向工程,他们的论文中甚至存在一些错误。然而,他们的方法实际上是一种非常强大且通用的方法的子集,称为“自由单子”。

例如,它们的Trace数据类型只是变相的自由单子。要了解原因,让我们参考一下 Free monad 的 Haskell 定义:

data Free f r = Pure r | Free (f (Free f r))

一个自由的 monad 就像一个“函子列表”,Pure它类似于一个列表的Nil构造函数,Free也类似于一个列表的 Cons构造函数,因为它在“列表”上添加了一个额外的函子。从技术上讲,如果我是迂腐的,没有什么说必须将自由 monad 实现为上述类似列表的数据类型,但无论您实现什么都必须与上述数据类型同构。

免费 monad 的好处在于,给定一个 functor fFree f它自动成为一个 monad:

instance (Functor f) => Monad (Free f) where
    return = Pure
    Pure r >>= f = f r
    Free x >>= f = Free (fmap (>>= f) x)

这意味着我们可以将它们的Trace数据类型分解为两部分,基本函子f,然后是由 生成的自由 monad f

-- The base functor
data TraceF x =
    SYS_NBIO (IO x)
  | SYS_FORK x x
  | SYS_YIELD x
  | SYS_RET
  | SYS_EPOLL_WAIT FD EPOLL_EVENT x

-- You can even skip this definition if you use the GHC
-- "DerivingFunctor" extension
instance Functor TraceF where
    fmap f (SYS_NBIO x) = SYS_NBIO (liftM f x)
    fmap f (SYS_FORK x) = SYS_FORK (f x) (f x)
    fmap f (SYS_YIELD x) = SYS_YIELD (f x)
    fmap f SYS_RET = SYS_RET
    fmap f (SYS_EPOLL_WAIT FD EPOLL_EVENT x) = SYS_EPOLL_WAIT FD EPOLL_EVEN (f x)

鉴于该函子,您可以Trace“免费”获得 monad:

type Trace a = Free TraceF a
-- or: type Trace = Free TraceF

...虽然这不是它被称为“免费”单子的原因。

然后更容易定义它们的所有功能:

liftF = Free . fmap Pure
-- if "Free f" is like a list of "f", then
-- this is sort of like: "liftF x = [x]"
-- it's just a convenience function

-- their definitions are written in continuation-passing style,
-- presumably for efficiency, but they are equivalent to these
sys_nbio io = liftF (SYS_NBIO io)
sys_fork t = SYS_FORK t (return ()) -- intentionally didn't use liftF
sys_yield = liftF (SYS_YIELD ())
sys_ret = liftF SYS_RET
sys_epoll_wait fd event = liftF (SYS_EPOLL_WAIT fd event ())

因此,您可以像使用 monad 一样使用这些命令:

myTrace fd event = do
    sys_nbio (putStrLn "Hello, world")
    fork $ do
        sys_nbio (putStrLn "Hey")
    sys_expoll_wait fd event

现在,这是关键概念。我刚刚写的那个 monad 只创建了一个数据类型。而已。它根本不解释它。这就像为表达式编写抽象语法树一样。如何评估它完全取决于您。在论文中,他们给出了表达式解释器的具体示例,但编写自己的解释器很简单。

重要的概念是这个解释器可以在你想要的任何 monad 中运行。因此,如果您想通过并发处理某些状态,您可以这样做。例如,这是一个玩具解释器,它使用 StateT IO monad 来跟踪调用 IO 操作的次数:

interpret t = case t of
    SYS_NBIO io -> do
        modify (+1)
        t' <- lift io
        interpret t'
    ...

你甚至可以在 forkIO 的动作中线程化 monad!这是我的一些非常古老的代码,它是错误的和蹩脚的,因为它是在我没有经验并且不知道什么是自由单子的时候写回来的,但它证明了这一点:

module Thread (Thread(..), done, lift, branch, fork, run) where

import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad.Cont
import Data.Sequence
import qualified Data.Foldable as F

data Thread f m =
    Done
  | Lift (m (Thread f m))
  | LiftIO (IO (Thread f m))
  | Branch (f (Thread f m))
  | Exit

done = cont $ \c -> Done
lift' x = cont $ \c -> Lift $ liftM c x
liftIO' x = cont $ \c -> LiftIO $ liftM c x
branch x = cont $ \c -> Branch $ fmap c x
exit = cont $ \c -> Exit

fork x = join $ branch [return (), x >> done]

run x = do
    q <- liftIO $ newTChanIO
    enqueue q $ runCont x $ \_ -> Done
    loop q
  where
    loop q = do
        t <- liftIO $ atomically $ readTChan q
        case t of
            Exit -> return ()
            Done -> loop q
            Branch ft -> mapM_ (enqueue q) ft >> loop q
            Lift mt -> (mt >>= enqueue q) >> loop q
            LiftIO it -> (liftIO $ forkIO $ it >>= enqueue q) >> loop q
    enqueue q = liftIO . atomically . writeTChan q

自由单子背后的要点是它们提供单子实例而不是其他任何东西。换句话说,它们退后一步,让你完全自由地解释它们,这就是它们如此有用的原因。

于 2012-04-28T17:07:00.077 回答