62

在我GHC Haskell使用 stm、network-conduit 和管道的应用程序中,每个套接字都有一个链,它使用runTCPServer. Strands 可以通过使用广播 TChan 与其他 strands 进行通信。

这展示了我想如何设置管道“链”:

在此处输入图像描述

所以,我们这里有两个源(每个都绑定到辅助管道),它们产生一个Packet对象,该对象encoder将接受并变成ByteString,然后发出套接字。我在两个输入的有效融合(性能是一个问题)方面遇到了很大的困难。

如果有人能指出我正确的方向,我将不胜感激。


由于我不尝试就发布这个问题是不礼貌的,所以我将把我之前尝试过的内容放在这里;

我已经编写/挑选了一个函数,该函数(阻塞)从 TMChan(可关闭通道)产生 Source;

-- | Takes a generic type of STM chan and, given read and close functionality,
--   returns a conduit 'Source' which consumes the elements of the channel.
chanSource 
    :: (MonadIO m, MonadSTM m)
    => a                    -- ^ The channel
    -> (a -> STM (Maybe b)) -- ^ The read function
    -> (a -> STM ())        -- ^ The close/finalizer function
    -> Source m b
chanSource ch readCh closeCh = ConduitM pull
    where close     = liftSTM $ closeCh ch
          pull      = PipeM $ liftSTM $ readCh ch >>= translate
          translate = return . maybe (Done ()) (HaveOutput pull close)

同样,将 Chan 转换为 sink 的函数;

-- | Takes a stream and, given write and close functionality, returns a sink
--   which wil consume elements and broadcast them into the channel 
chanSink
    :: (MonadIO m, MonadSTM m)
    => a                 -- ^ The channel
    -> (a -> b -> STM()) -- ^ The write function
    -> (a -> STM())      -- ^ The close/finalizer function
    -> Sink b m ()
chanSink ch writeCh closeCh = ConduitM sink
    where close  = const . liftSTM $ closeCh ch
          sink   = NeedInput push close
          write  = liftSTM . writeCh ch
          push x = PipeM $ write x >> return sink

那么mergeSources 就很简单了;fork 2 个线程(我真的不想这样做,但到底是什么),可以将他们的新项目放入一个列表中,然后我会生成一个列表;

-- | Merges a list of 'Source' objects, sinking them into a 'TMChan' and returns
--   a source which consumes the elements of the channel.
mergeSources
    :: (MonadIO m, MonadBaseControl IO m, MonadSTM m)
    => [Source (ResourceT m) a]             -- ^ The list of sources
    -> ResourceT m (Source (ResourceT m) a)
mergeSources sx = liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn
    where push c s = s $$ chanSink c writeTMChan closeTMChan
          fsrc x c = mapM_ (\s -> resourceForkIO $ push c s) x
          retn c   = return $ chanSource c readTMChan closeTMChan

虽然我成功地对这些函数进行了类型检查,但我没有成功地利用这些函数进行类型检查;

-- | Helper which represents a conduit chain for each client connection
serverApp :: Application SessionIO
serverApp appdata = do
    use ssBroadcast >>= liftIO . atomically . dupTMChan >>= assign ssBroadcast
    -- appSource appdata $$ decoder $= protocol =$= encoder =$ appSink appdata
    mergsrc $$ protocol $= encoder =$ appSink appdata
    where chansrc = chanSource (use ssBroadcast) readTMChan closeTMChan
          mergsrc = mergeSources [appSource appdata $= decoder, chansrc]

-- | Structure which holds mutable information for clients
data SessionState = SessionState
    { _ssBroadcast     :: TMChan Packet -- ^ Outbound packet broadcast channel
    }

makeLenses ''SessionState

-- | A transformer encompassing both SessionReader and SessionState
type Session m = ReaderT SessionReader (StateT SessionState m)

-- | Macro providing Session applied to an IO monad
type SessionIO = Session IO

无论如何,我认为这种方法存在缺陷——有许多中间列表和转换。这对性能不利。寻求指导。


PS。据我所知,这不是重复的;将具有多个输入的管道融合在一起,因为在我的情况下,两个源都产生相同的类型,我不在乎从哪个源产生Packet对象,只要我不等待一个而另一个有准备使用的对象。

聚苯乙烯。对于示例代码中 Lens 的使用(以及知识要求),我深表歉意。

4

1 回答 1

1

我不知道这是否有任何帮助,但我尝试实施 Iain 的建议,并在mergeSources'任何渠道完成后立即停止该变体:

mergeSources' :: (MonadIO m, MonadBaseControl IO m)
              => [Source (ResourceT m) a] -- ^ The sources to merge.
              -> Int -- ^ The bound of the intermediate channel.
              -> ResourceT m (Source (ResourceT m) a)
mergeSources' sx bound = do
    c <- liftSTM $ newTBMChan bound
    mapM_ (\s -> resourceForkIO $
                    s $$ chanSink c writeTBMChan closeTBMChan) sx
    return $ sourceTBMChan c

(这个简单的添加可在此处获得)。

对您的版本的一些评论mergeSources(对它们持保留态度,可能是我不太了解某些东西):

  • 使用...TMChan而不是...TBMChan似乎很危险。如果作者比读者快,你的堆就会爆炸。查看您的图表,如果您的 TCP 对等方读取数据的速度不够快,这似乎很容易发生。所以我肯定会使用...TBMChan可能很大但有限的界限。
  • 你不需要MonadSTM m约束。所有 STM 的东西都IO

    liftSTM = liftIO . atomically
    

    也许这会在使用mergeSources'in时对您有所帮助serverApp

  • 只是外观问题,我发现

    liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn
    

    liftA2由于在(->) rmonad上使用,因此很难阅读。我会说

    do
        c <- liftSTM newTMChan
        fsrc sx c
        retn c
    

    会更长,但更容易阅读。

您能否创建一个可以玩的独立项目serverApp

于 2013-07-05T19:08:55.897 回答