3

我正在编写一个服务器,其中一个要求是它需要能够将数据推送到客户端,而无需客户端直接请求数据。我正在使用导管,但感觉这超出了导管的能力。我遇到的问题是似乎没有办法判断套接字是否有可用的数据,并且 await 将阻止执行,直到有可用的数据。假设我有以下功能

getPacket :: Conduit ByteString IO ClientPacket --take a bytestring and yield a ClientPacket i.e. the ByteString deserialized into a sensible form
processPacket :: Conduit ClientPacket IO ServerPacket --take a ClientPacket and yield a ServerPacket i.e. a response to the client's request
putPacket :: Conduit ServerPacket IO ByteString --serialize the ServerPacket

然后我将管道与来自 Conduit.Network 库的源和接收器连接在一起

appSource appData $$ getPacket =$= processPacket =$= putPacket $= appSink appData

现在,我从管道外部引入一个数据源,并且我想将该数据合并到管道中。例如,如果这是一个聊天服务器,则外部数据将是其他客户端发送的消息。问题是,无论我尝试在哪里引入这些外部数据,它都会被调用 await 阻塞。本质上,我最终会得到如下所示的代码。

yield processOutsideData --deal with the outside data
data <- await            --await data from upstream

处理更多外部数据的唯一方法是上游组件产生某些东西,但上游只有在从客户端获取数据时才会产生,这正是我试图避免的。我已经尝试使用多个线程和 TChan 来解决这个问题,但似乎 appSource 和 appSink 必须在同一个线程中使用,否则我会从 recv 获得无效的文件描述符异常(这是有道理的)。

但是,如果套接字源和接收器在同一个线程中运行,我会再次遇到等待阻塞的问题,并且我无法检查套接字是否有可用的数据。在这一点上,我似乎已经用导管撞到了墙上。

但我真的很喜欢使用导管,并且更愿意继续使用它们。所以我的问题是:有没有办法做我想要通过管道实现的目标?

4

1 回答 1

1

Michael Snoyman 的管道网络示例使用并发。telnet 客户端示例运行一个线程用于发送输入,另一个用于显示收到的内容。我已经对其进行了调整以发送和接收整行

{-# LANGUAGE OverloadedStrings #-}
import Conduit
import Control.Concurrent.Async (concurrently)
import Control.Monad            (liftM, void)
import Data.ByteString          (ByteString)
import Data.ByteString.Char8    (unpack)
import Data.Conduit.Network
import Data.String              (IsString, fromString)
import Network                  (withSocketsDo)

getLines :: (IsString a, MonadIO m) => Producer m a
getLines = repeatMC . liftM fromString $ liftIO getLine

putLines :: (MonadIO m) => Consumer ByteString m ()
putLines = mapM_C $ liftIO . putStrLn . unpack

main :: IO ()
main = withSocketsDo $
    runTCPClient (clientSettings 4000 "localhost") $ \server ->
        void $ concurrently
            (getLines $$ appSink server)
            (appSource server $$ putLines)

我们可以在服务器上做同样的事情。创建一个STM通道,将接收到的数据写入通道,然后将数据从通道发送到客户端。这使用stm-conduit包的简单包装器围绕 STM 通道,sourceTBMChan以及sinkTBMChan.

{-# LANGUAGE OverloadedStrings #-}
import Conduit
import Control.Concurrent.Async       (concurrently)
import Control.Concurrent.STM.TBMChan (newTBMChan)
import Control.Monad                  (void)
import Control.Monad.STM              (atomically)
import Data.Conduit.Network
import Data.Conduit.TMChan            (sourceTBMChan, sinkTBMChan)
import Network                        (withSocketsDo)

main :: IO ()
main = withSocketsDo $ do
    channel <- atomically $ newTBMChan 10
    runTCPServer (serverSettings 4000 "*") $ \server ->
        void $ concurrently
            (appSource server $$ sinkTBMChan channel False)
            (sourceTBMChan channel $$ appSink server)

如果我们只连接一个客户端运行服务器,它会回显客户端发送的内容。

----------
| a      | (sent)
| a      | (received)
| b      | (sent)
| b      | (received)
| c      | (sent)
| c      | (received) 
----------

如果我们在连接多个客户端的情况下运行服务器,则消息将分布在客户端之间,一个客户端获取每条消息。

----------             ----------
| 1      | (sent)      | 1      | (received)
| 2      | (sent)      | 3      | (received)
| 2      | (received)  |        |
| 3      | (sent)      |        |
|        |             |        |
|        |             |        |
----------             ----------

此示例不处理客户端关闭连接时要执行的操作。

于 2015-08-11T15:16:17.223 回答