0

首先,值得一提的是,在单个 F# 解决方案中,Bond消息的序列化和反序列化工作正常。但是,我无法正确处理通过 ZeroMQ 发送和/或接收消息。

以下程序的订阅方存在运行时错误。.bond 文件是使用bond 编译器定义和编译的。然后从 C# 创建一个 dll 以从 F# 调用。然后我有两个 F# 程序。一个通过 tcp 套接字发布序列化数据,另一个是订阅者。当在 sub 上接收到消息时,尝试 Unmarshal 原始数据的行是导致运行时错误的行。任何人都可以看到这个的原因吗?

[编辑] 根据 Fyodor 的评论,我在发布者方面进行了更改,从而更改了订阅者方面的错误。所以这个错误可能与我如何打包和解包信息有关。

这是 .bond 文件

namespace Examples

struct Record
{
    0: map<string, double> payload;
}

这是发布者:

// publisher

open System
open Bond
open Bond.Protocols
open Bond.IO.Safe
open ZeroMQ

let ctx = new ZContext()
let publisher = new ZSocket(ctx, ZSocketType.PUB)
publisher.Bind("tcp://*:5556")

let src = new Examples.Record()
src.payload.Add("a", 1.)
src.payload.Add("b", 2.)

let output = new OutputBuffer()
let writer = new CompactBinaryWriter<OutputBuffer>(output)

while true do
    Marshal.To(writer, src)
    //let input = new InputBuffer(output.Data)
    //let byteArr = input.ReadBytes(int(input.Length - 1L))
    let updateFrame = new ZFrame(System.Text.Encoding.ASCII.GetString output.Data.Array)
    publisher.Send(updateFrame)

这是订户:

// subscriber

open Bond
open Bond.Protocols
open Bond.IO.Safe
open System
open System.Text
open ZeroMQ

let ctx = new ZContext()
let subscriber = new ZSocket(ctx, ZSocketType.SUB)
subscriber.Connect("tcp://127.0.0.1:5556")
subscriber.SubscribeAll()

let output = new OutputBuffer()    
while true do    
    let received = subscriber.ReceiveFrame()
    let byteArr = Encoding.ASCII.GetBytes (received.ReadString())
    let arrSeg = ArraySegment<byte>(byteArr)
    let input = new InputBuffer(arrSeg)
    let dst = Unmarshal<Examples.Record>.From(input)
    for KeyValue(k, v) in dst.payload do
        printfn "%A %A" k v
4

1 回答 1

4

在接收端,当您尝试将编组的 Bond Compact Binary 解码为 ASCII 字符串时,您会丢失一些有效负载。当封送一个像Record压缩二进制这样的结构时,有效载荷的前四个字节是0x43 0x42 0x10 0x00. 从 ZFrame 读取字符串时,遇到的第一个嵌入NUL (0x00)表示字符串结束,无论帧的大小如何。因此,读取端只看到0x43 0x42 0x10而不是整个有效负载(我测试时为 29 个字节)。

由于 Compact Binary 是二进制协议,因此您需要使用ZFrame在发布者端获取缓冲区的构造函数:

let updateFrame = new ZFrame(output.Data.Array, output.Data.Offset, output.Data.Count)

在订阅者方面,您只需要读取缓冲区:

let byteArr = received.Read()

此外,在发布者方面,您不断地将数据累积到同一个 OutputBuffer 中。output.Position在编组下一条记录以重新使用缓冲区而不是增加缓冲区之前,您需要重置为 0:

while true do  
    Marshal.To(writer, src)
    let updateFrame = new ZFrame(output.Data.Array, output.Data.Offset, output.Data.Count)output.Data.Array)
    publisher.Send(updateFrame)
    output.Position <- 0

还有一点需要注意:分配给 an 的默认缓冲区OutputBuffer是 65KiB。一旦您知道有效载荷将有多大,请考虑将其缩小。

注意:我在一个具有类似语义的 C# 应用程序中调试了它。这是我使用的:

namespace so_q_zmq
{
    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading.Tasks;
    using Bond;
    using Bond.IO.Safe;
    using Bond.Protocols;
    using ZeroMQ;

    [Schema]
    class Record
    {
        [Id(0)]
        public Dictionary<string, double> payload = new Dictionary<string, double>();
    }

    class Program
    {
        static void Main(string[] args)
        {
            var pTask = Task.Run(() =>
            {
                try
                {
                    Publisher();
                }
                catch (Exception ex)
                {
                    Console.WriteLine("Publisher failed: {0}", ex);
                }
            });

            var sTask = Task.Run(() =>
            {
                try
                {
                    Subscriber();
                }
                catch (Exception ex)
                {
                    Console.WriteLine("Subscriber failed: {0}", ex);
                }
            });

            Task.WaitAll(pTask, sTask);
            Console.WriteLine("Done");
            Console.ReadLine();
        }

        static void Publisher()
        {
            var ctx = new ZContext();
            var publisher = new ZSocket(ctx, ZSocketType.PUB);
            publisher.Bind("tcp://127.0.0.1:12345");

            var src = new Record();
            src.payload.Add("a", 1.0);
            src.payload.Add("b", 2.0);

            var output = new OutputBuffer();
            var writer = new CompactBinaryWriter<OutputBuffer>(output);

            for (;;)
            {
                Marshal.To(writer, src);
                // INCORRECT:
                // var str = Encoding.ASCII.GetString(output.Data.Array);
                // var updateFrame = new ZFrame(str);
                var updateFrame = new ZFrame(output.Data.Array, output.Data.Offset, output.Data.Count);
                publisher.Send(updateFrame);
                output.Position = 0;
            }
        }

        static void Subscriber()
        {
            var ctx = new ZContext();
            var subscriber = new ZSocket(ctx, ZSocketType.SUB);
            subscriber.Connect("tcp://127.0.0.1:12345");
            subscriber.SubscribeAll();

            for (;;)
            {
                var received = subscriber.ReceiveFrame();
                // INCORRECT
                // var str = received.ReadString();
                // var byteArr = Encoding.ASCII.GetBytes(str);
                var byteArr = received.Read();
                var arrSeg = new ArraySegment<byte>(byteArr); // There's an InputBuffer ctor that takes a byte[] directly
                var input = new InputBuffer(arrSeg);
                var dst = Unmarshal<Record>.From(input);
                foreach (var kvp in dst.payload)
                {
                    Console.WriteLine("{0} {1}", kvp.Key, kvp.Value);
                }
            }
        }
    }
}
于 2017-01-23T23:04:47.480 回答