2

我正在使用 Pipes-2.1.0 包和 zeromq3-haskell 包来构建一个小消息管道。除了我无法理解 Frames 的最终确定之外,一切似乎都进展顺利。

在接下来的框架中,我获得了两个资源;zeromq 上下文和 zeromq 套接字。然后我不断地等待消息(以 的形式ByteStrings)在 zeromq 套接字上发布。

{-# LANGUAGE RebindableSyntax    #-}
{-# LANGUAGE ScopedTypeVariables #-}

module PipesZeroMQ where

import           Control.Frame
import           Control.IMonad.Do
import           Control.IMonad.Trans
import qualified Control.Monad          as M
import           Data.ByteString        (ByteString)
import           Data.String
import           Prelude                hiding (Monad(..))
import qualified System.ZMQ3            as ZMQ

type Address = String

fromList :: (M.Monad m) => [b] -> Frame b m (M a) (M a) ()
fromList xs = mapMR_ yield xs

publisher :: Address -> Frame Void IO (M ByteString) C ()
publisher addr = do
  c  <- liftU $ ZMQ.init 1
  s  <-liftU $ ZMQ.socket c ZMQ.Pub
  liftU $ ZMQ.bind s addr   
  liftU $ print "Socket open for business!!!"

  foreverR $ do
    bs <- await
    finallyF (ZMQ.close s M.>> ZMQ.term c M.>> print "ZMQ socket closed") $ do
         (liftU $ ZMQ.send s [] bs)
         (liftU (print "Sending message"))

现在如果我试试这个:

λ> runFrame $ (publisher localAddress) <-< (fromList ["This", "that", "that"] >> close)

我明白了:

"Socket open for business"
"Sending message"
"ZMQ socket closed"
*** Exception: ZMQError { errno = 88, source = "send", message = "Socket operation on non-socket" }

publisher收到后完成,但一个BytesString

为什么会这样?

我对在 Pipes-2.1.0 中使用 Frames 进行最终确定有什么误解?

如果我开始攻击外面的树,它还有机会吗?

4

1 回答 1

3

您在编写publisher函数时犯了一个错误:

foreverR $ do
    bs <- await
    finallyF (ZMQ.close s M.>> ZMQ.term c M.>> print "ZMQ socket closed") $ do
         (liftU $ ZMQ.send s [] bs)
         (liftU (print "Sending message"))

您可能想将finallyFOUTSIDE 放在foreverR循环之外:

finallyF (...) $ foreverR $ do
    bs <- await
    liftU $ ZMQ.send s [] bs)
    liftU (print "Sending message")

你写它的方式,它在每​​次发送后完成,所以它完全按照你告诉它做的事情:每次发送后完成。 finallyF一旦它包装的动作完成后调用终结器,无论它是否成功终止。你也可以catchF在这种情况下使用,因为循环永远不会终止:

 catchF (...) $ foreverR $ do
    bs <- await
    liftU $ ZMQ.send s [] bs)
    liftU (print "Sending message")

或者,您可以将其保留在循环内但切换到,catchF以便在每次发送后不会运行终结器:

foreverR $ do
    bs <- await
    catchF (ZMQ.close s M.>> ZMQ.term c M.>> print "ZMQ socket closed") $ do
         (liftU $ ZMQ.send s [] bs)
         (liftU (print "Sending message"))

另外,如果您计划编写基于管道的 zeroMQ 库,请与我保持联系,因为我计划在下一个版本中将帧返回到普通的 monad,同时对功能进行许多新的增强,例如关闭和重新初始化资源的能力。要联系我,请使用我的 gmail.com 地址和用户名 Gabriel439。

于 2012-07-14T00:21:22.153 回答