0

我正在使用 GoLang protobuf 对通过单个 tcp 连接发送的消息进行编码(和解码)。

.proto 结构

message Prepare{
   int64 instance = 1;
   int64 round = 2;
   int64 nodeId = 3;
}

然后我使用该protoc工具生成相应的存根。

这就是我将内容写入线路的方式。

func (t *Prepare) Marshal(wire io.Writer) {

    data, err := proto.Marshal(t)
    if err != nil {
        panic(err)
    }
    _, err = wire.Write(data)
    if err != nil {
        panic(err)
    }
}

这就是我在接收方阅读和解组的方式。

func (t *Prepare) Unmarshal(wire io.Reader) error {
    data := make([]byte, 8*1024*1024) 
    length, err := wire.Read(data)
    if err != nil {
        panic(err)
    }
    err = proto.Unmarshal(data[:length], t)
    if err != nil {
        panic(err)
    }
    return nil
}

如果对于每个 protobuf 消息,都会产生一个新的 tcp 连接,则上述方法可以正常工作。但是当使用单个 tcp 连接来传输多个消息(持久连接)时,解组失败并出现错误proto: invalid field number

出现此问题的原因是,使用单个连接发送的 protobuf 消息不强制任何消息边界,因此在读取缓冲区时可以包含对应于 1) 多个 protobuff 消息和 2) 部分 protobuff 消息的字节length, err := wire.Read(data)data

protobuf 文档提到以下作为解决方案。

如果要将多条消息写入单个文件或流,则由您来跟踪一条消息的结束位置和下一条消息的开始位置。协议缓冲区有线格式不是自定界的,因此协议缓冲区解析器无法自行确定消息的结束位置。解决此问题的最简单方法是在编写消息本身之前写入每条消息的大小。当您读回消息时,您读取大小,然后将字节读入单独的缓冲区,然后从该缓冲区解析。(如果您想避免将字节复制到单独的缓冲区,请查看 CodedInputStream 类(在 C++ 和 Java 中),它可以被告知将读取限制为一定数量的字节。)

虽然这是一种直观的方法,但它归结为一个先有鸡还是先有蛋的问题。写入线路的字节数组的长度(取自data, err := proto.Marshal(t); len(data))不是固定的,也不知道表示这个数字需要多少字节(len(data))。现在我们遇到了同样的问题,如何将字节数组的长度发送到接收方读取,而不知道实际需要多少字节length(换句话说,接收方如何知道对应的字节数到场length

对此有什么建议吗?

谢谢

4

2 回答 2

2

我会推荐使用 gRPC,但你已经说过你不想要那个。我还可以推荐发送简单的 UTP 包,因为 UDP 根本不需要连接。

如果您想坚持当前的方法,解决方案很简单:将 protobuf 编组为字节数组后,您就知道它的长度了。这是len(data)你想先写的值。实际写入的字节数wire.Write()将是相同的。如果不是,则连接有问题,并且包仅部分写入。所以接收器无法将其解组。

接收时,首先读取长度,准备一个大小正确的缓冲区,或者更好的是,制作一个LimitedReader并从那里解组。

字节数应编码为整数。您可以使用 32 位或 64 位值,您还需要在小端和大端之间做出决定 - 您使用的无关紧要,只要发送方和接收方的大小和字节序相同。

看看https://pkg.go.dev/encoding/binary和 ByteOrder 上定义的函数:

binary.LittleEndian.PutUint64(w, uint64(len(data)))
length := int64(binary.LittleEndian.Uint64(r))

当然,即使有一个简单的错误,或者你只错了一个字节,那么所有剩余的数据实际上都是无用的。通过将消息作为专用 UDP 包发送,您可以避免此问题。

于 2021-08-03T12:47:33.007 回答
0

针对问题中提到的确切情况详细说明上述答案

func (t *Prepare) Marshal(wire io.Writer) {
    data, err := proto.Marshal(t)
    if err != nil {
        panic(err)
    }
    lengthWritten := len(data)
    var b [16]byte
    bs := b[:16]
    binary.LittleEndian.PutUint64(bs, uint64(lengthWritten))
    _, err = wire.Write(bs)
    if err != nil {
        panic(err)
    }
    _, err = wire.Write(data)
    if err != nil {
        panic(err)
    }
}

func (t *Prepare) Unmarshal(wire io.Reader) error {

    var b [16]byte
    bs := b[:16]

    _, err := io.ReadFull(wire, bs)
    numBytes := uint64(binary.LittleEndian.Uint64(bs))

    data := make([]byte, numBytes)
    length, err := io.ReadFull(wire, data)
    if err != nil {
        panic(err)
    }
    err = proto.Unmarshal(data[:length], t)
    if err != nil {
        panic(err)
    }
    return nil
}
于 2021-08-03T14:38:44.803 回答