8

conduit下面是一些使用、network-conduit和实现小型接收服务器的代码stm-conduit。它在套接字上接收数据,然后通过 STM 通道将其流式传输到主线程。

import Control.Concurrent (forkIO)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TBMChan (newTBMChan, TBMChan())
import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Trans.Class

import Data.ByteString (ByteString)
import qualified Data.ByteString as B
import Data.Conduit
import qualified Data.Conduit.Binary as DCB
import Data.Conduit.Extra.Resumable
import Data.Conduit.Network (sourceSocket)
import Data.Conduit.TMChan (sinkTBMChan, sourceTBMChan, mergeSources)

import System.Directory (removeFile)
import System.IO

type BSChan = TBMChan ByteString

listenSocket :: Socket -> Int -> IO BSChan
listenSocket soc bufSize = do
    chan <- atomically $ newTBMChan bufSize
    forkListener chan
    return chan
  where
    forkListener chan = void . forkIO $ listen soc 2 >> loop where 
      loop = do
        (conn, _) <- accept soc
        sourceSocket conn $$ sinkTBMChan chan
        close conn
        loop

main :: IO ()
main = do
  soc <- socket AF_UNIX Stream 0
  bind soc (SockAddrUnix "mysock")
  socChan <- listenSocket soc 8
  sourceTBMChan socChan $$ DCB.sinkHandle stdout
  removeFile "mysock"

(在实际应用程序中,来自套接字的数据流与其他一些数据流合并,这就是为什么我不在侦听器中直接处理它的原因)。

问题是,我曾期望它在主线程被杀死之前保持打开状态,而是在套接字上收到第一条消息后退出。我无法弄清楚它为什么这样做,除非它是接收器(在第 2 到最后一行)在看到第一个数据流的结尾时退出。我可以说服它不要这样做吗?有一些Conduit关于使源可恢复的东西,但不是接收器。

4

4 回答 4

7

从以下文档sinkTBMChan

当 sink 关闭时,通道也将关闭。

因此,当第一个套接字句柄关闭时,它会导致SourcefromsourceSocket关闭,关闭连接的接收器,进而关闭TBMChan传播到sinkHandle停止接收器的接收器。

解决此问题的最简单方法可能是将您更改loop为不会在连接之间关闭的自定义源并将该源连接到TBMChan.

listenSocket :: Socket -> Int -> IO BSChan
listenSocket soc bufSize = do
    chan <- atomically $ newTBMChan bufSize
    forkListener chan
    return chan
  where
    forkListener chan = void . forkIO $ do
      listen soc 2
      loop $$ sinkTBMChan chan

    loop = do
      (conn, _) <- liftIO $ accept soc
      sourceSocket conn
      liftIO $ close conn
      loop
于 2014-01-06T17:17:25.640 回答
5

协调关闭频道的作者和读者是一个不小的问题,但您可以重用pipes生态系统中的解决方案来解决这个问题,即使用pipes-concurrency库。这个库提供了几个pipes独立的实用程序,您可以在conduit库中重复使用这些实用程序来在读取器和写入器之间进行通信,以便每一方自动正确地知道何时清理,您也可以手动清理任一方。

您从pipes-concurrency库中使用的关键函数是spawn. 它的类型是:

spawn :: Buffer a -> IO (Output a, Input a)

Buffer指定要使用的底层 STM 通道抽象。从您的示例代码来看,听起来您想要一个Bounded缓冲区:

spawn (Bounded 8) :: IO (Output a, Input a)

在这种情况下, Thea可以是任何东西,所以它可以是ByteString,例如:

spawn (Bounded 8) :: IO (Output ByteString, Input ByteString)

Input和的Output行为就像一个邮箱。您可以通过向 s 发送send数据来将消息添加到邮箱,并通过从 s发送数据从Output邮箱中取出消息(按 FIFO 顺序):recvInput

-- Returns `False` if the mailbox is sealed
send :: Output a -> a -> STM Bool

-- Returns `Nothing` if the mailbox is sealed
recv :: Input a -> STM (Maybe a)

其巧妙的功能pipes-concurrency是,如果邮箱没有读者或没有写者,它会检测垃圾收集器自动密封邮箱。这避免了常见的死锁来源。

如果您使用的是pipes生态系统,您通常会使用以下两个更高级别的实用程序来读取和写入邮箱。

-- Stream values into the mailbox until it is sealed
toOutput :: Output a -> Consumer a IO ()

-- Stream values from the mailbox until it is sealed
fromInput :: Input a -> Producer a IO ()

但是,由于核心机制是pipes独立的,您可以重写conduit这些函数的等效版本:

import Control.Monad.Trans.Class (lift)
import Data.Conduit
import Pipes.Concurrent

toOutput' :: Output a -> Sink a IO ()
toOutput' o = awaitForever (\a -> lift $ atomically $ send o a)

fromInput' :: Input a -> Source IO a
fromInput' i = do
    ma <- lift $ atomically $ recv i
    case ma of
        Nothing -> return ()
        Just a  -> do
            yield a
            fromInput' i

然后你的主要功能看起来像这样:

main :: IO ()
main = do
    soc <- socket AF_UNIX Stream 0
    bind soc (SockAddrUnix "mysock")
    (output, input) <- spawn (Bounded 8)
    forkIO $ readFromSocket soc $$ toOutput output
    fromInput input $$ DCB.sinkHandle stdout
  removeFile "mysock"

...从您readFromSocket的.SourceSocket

然后,您也可以自由地写入output使用其他数据源,而不必担心在完成后必须协调它们或正确处理或处理input它们output

要了解更多pipes-concurrency,我推荐阅读官方教程

于 2014-01-07T02:01:51.843 回答
2

我认为@shang 的答案是正确的,我会走得更远一点,说writeTBMChan这里的行为看起来像是更好的罪魁祸首。我建议将其更改为不自动关闭TBMChan. 这个想法的一个简单实现是:

sinkTBMChan chan = awaitForever $ liftIO . atomically . writeTBMChan chan

如果您在程序中使用它,它将按预期工作。

于 2014-01-07T05:51:55.360 回答
1

因此,这是一个不涉及创建可恢复接收器的答案。sourceSocketinnetwork-conduit允许单个连接,但我们可以在内部实现重新连接行为(sourceSocket对代码表示歉意,我认为它需要清理,但至少它可以工作!):

sourceSocket :: (MonadIO m) => Socket -> Producer m ByteString
sourceSocket sock =
    loop
  where
    loop = do
      (conn, _) <- lift . liftIO $ accept sock
      loop' conn
      lift . liftIO $ close conn
      loop
    loop' conn = do
      bs <- lift . liftIO $ recv conn 4096
      if B.null bs
        then return ()
        else yield bs >> loop' conn

这里的一个问题是它永远不会退出(直到程序终止)。在我的用例中这不是问题,因为套接字应该在程序的生命周期内一直监听。

于 2014-01-06T17:08:44.627 回答