1

我在 ZeroMQ 中遇到了一个简单的发布-订阅示例的问题。我已阅读大量文档,但似乎找不到答案。

我从 NuGetlibzmq得到。clrzmq对于下面的两个函数,套接字地址是:

let sktAddr = "tcp://127.0.0.1:3456"

这是一个简单的发布者,每秒排队一条消息。

// Publisher - this seems to work fine
let publisher () : unit =
    let skt = (new ZMQ.Context()).Socket(ZMQ.SocketType.PUB)
    skt.SetSockOpt(ZMQ.SocketOpt.LINGER, 0)
    skt.Bind sktAddr
    skt.SendMore("TEST_TOPIC", Text.Encoding.Unicode) |> ignore
    let rec h1 () : unit =
        let nv = DateTime.Now.ToUniversalTime().ToString()
        printfn "Sending value: %s" nv
        skt.Send(Text.Encoding.Unicode.GetBytes nv) |> ignore
        Threading.Thread.Sleep 1000
        let swt = new Threading.SpinWait()
        swt.SpinOnce()
        if Console.KeyAvailable then
            match Console.ReadKey().Key with
            | ConsoleKey.Q -> ()
            | _ -> h1()
        else
            h1()
    h1()

下面的简单订阅者不会引发错误,但会挂在下面指示的行。

// Subscriber
let subscriber () : unit =
    let skt = (new ZMQ.Context()).Socket(ZMQ.SocketType.SUB)
    skt.Connect sktAddr
    skt.Subscribe("TEST_TOPIC", Text.Encoding.Unicode)
    let rec h1 () : unit =
        let oDat = skt.Recv() // THE PROGRAMME HANGS HERE!
        let strODat = (new Text.UnicodeEncoding()).GetString oDat
        if oDat <> null then
            printfn "Received: %s" strODat
        else
            printfn "No data received"
        let swt = new System.Threading.SpinWait()
        swt.SpinOnce()
        if Console.KeyAvailable then
            match Console.ReadKey().Key with
            | ConsoleKey.Q -> ()
            | _ -> h1()
        else
            h1()
    h1()

我已阅读此问题,但未提供任何解决方案。所以我在这里发布一个新问题。

在此先感谢您的帮助。

4

1 回答 1

4

我相信问题出在发布者身上:

skt.SendMore("TEST_TOPIC", Text.Encoding.Unicode)

不知道 F#,上面的语句似乎发生在循环之外。如果订阅者正在侦听TEST_TOPIC,则来自发布者的任何消息都需要主题名称位于每条消息的内容之前,因此发布者每次发送时都必须这样做:

skt.SendMore("TEST_TOPIC", Text.Encoding.Unicode)
skt.Send("some data here", Text.Encoding.Unicode)

..尝试这个...

let publisher () : unit =
    let skt = (new ZMQ.Context()).Socket(ZMQ.SocketType.PUB)
    skt.SetSockOpt(ZMQ.SocketOpt.LINGER, 0)
    skt.Bind sktAddr

    let rec h1 () : unit =
        let nv = DateTime.Now.ToUniversalTime().ToString()
        printfn "Sending value: %s" nv
        skt.SendMore("TEST_TOPIC", Text.Encoding.Unicode) |> ignore
        skt.Send(Text.Encoding.Unicode.GetBytes nv) |> ignore
        Threading.Thread.Sleep 1000
        let swt = new Threading.SpinWait()
        swt.SpinOnce()
        if Console.KeyAvailable then
            match Console.ReadKey().Key with
            | ConsoleKey.Q -> ()
            | _ -> h1()
        else
            h1()
    h1()

..并且订阅者必须为每条消息接收两次:

// Subscriber
let subscriber () : unit =
    let skt = (new ZMQ.Context()).Socket(ZMQ.SocketType.SUB)
    skt.Connect sktAddr
    skt.Subscribe("TEST_TOPIC", Text.Encoding.Unicode)
    let rec h1 () : unit =
        let topicName = skt.Recv()
        let oDat = skt.Recv()
        let strODat = (new Text.UnicodeEncoding()).GetString oDat
        if oDat <> null then
            printfn "Received: %s" strODat
        else
            printfn "No data received"
        let swt = new System.Threading.SpinWait()
        swt.SpinOnce()
        if Console.KeyAvailable then
            match Console.ReadKey().Key with
            | ConsoleKey.Q -> ()
            | _ -> h1()
        else
            h1()
    h1()
于 2013-08-08T13:54:28.957 回答