5

我为一个小型多人游戏实现了一个协议。它基于字节,因此为了反序列化接收到的消息,我必须遍历字节流并逐位解析它。在获得所有字节并知道消息类型后,我将字节放入反向构造函数中,该构造函数从原始字节构造协议数据单元。

这整个过程非常丑陋,不是真正的 OO 并且有不可读的if/else代码。我必须为reverseConstructor(byte[] bytes)我添加的每个协议数据单元 (pdu) 实现。每个 pdu 定义某种模式的方法(例如 schema = [1 byte int (id = x), x bytes ascii string, 4 bytes double]),并且使用该模式完成字节的处理,将更优雅。

我在这里得到了关于使用 google protobufs 的提示 (显然它们不符合我的需求,因为我必须更改协议以遵守 protobuf 标准)。

信息

我无法更改协议。有两种不同的场景(我不想同时支持它们甚至在同一个程序中):

  • 协议数据单元在标头中编码了一个长度字段
  • 协议数据单元没有长度字段,但可以从消息类型派生出消息何时/何地结束。

我个人是长度字段的粉丝。但有时您必须遵守其他人设计的协议。所以协议是固定的。它们都有一个标头,其中包含协议 id、唯一消息 id以及在第一种情况下的长度字段。

问题

谁能给我一个非常小的例子,两个简单的协议数据单元由高效的通用接收方法解析?我在 protobuf 教程中找到的唯一示例是以下类型:用户 a 发送消息 x,用户 b 期望消息 X,并且可以毫无问题地反序列化它。

但是如果用户 b 必须为消息 x、y 和 z 做好准备呢?如何以一种智能的方式在没有大量代码重复的情况下处理这种情况。

我也很欣赏设计原则的提示,使我能够在不使用外部库的情况下在这里实现更好的代码。


编辑

我认为这样是要走的路。您可以在此处找到更多代码。字节被动态读取,直到找到一个对象,然后缓冲区的位置被重置。

                while (true) {
                        if (buffer.remaining() < frameLength) {
                                buffer.reset();
                                break;
                        }
                        if (frameLength > 0) {
                                Object resultObj = prototype.newBuilderForType().mergeFrom(buffer.array(), buffer.arrayOffset() + buffer.position(), frameLength).build();
                                client.fireMessageReceived(resultObj);
                                buffer.position(buffer.position() + frameLength);
                                buffer.mark();
                        }
                        if (buffer.remaining() > fieldSize) {
                                frameLength = getFrameLength(buffer);
                        } else {
                                break;
                        }
                }

JavaDoc- mergeFrom

将数据解析为这种类型的消息,并将其与正在构建的消息合并。这只是 MessageLite.Builder.mergeFrom(CodedInputStream) 的一个小包装。 https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/Message.Builder#mergeFrom(byte[])

问题是这种类型的部分消息,但应该可以用通用方法解决这个问题。


样本

这是一个示例协议数据单元。它有一个长度字段。还有另一种情况,PDU 没有长度字段。这个 pdu 的大小是可变的。还有固定大小的PDU。

样品 pdu

为了完整起见。这里是协议数据单元中字符串的表示。

pdu中的字符串

4

7 回答 7

7

1. 协议设计

坦率地说,创建第一个协议实现而不考虑任何进一步的改变是一个常见的错误。作为练习,让我们尝试设计灵活的协议。

在此处输入图像描述

基本上,这个想法是将多个帧封装到彼此中。请注意,您有可用的有效载荷 ID,因此很容易识别序列中的下一帧。

您可以使用Wireshark来查看现实生活中的协议通常遵循相同的原则。

在此处输入图像描述

这种方法大大简化了数据包的分解,但仍然可以处理其他协议。

2.协议解码(剖析)

我花了很多时间为我以前的公司开发下一代网络分析仪。

在此处输入图像描述

无法公开所有细节,但关键特性之一是灵活的协议栈,能够识别协议帧。RTP就是一个很好的例子,因为在低层(通常是 UDP)上没有提示下一帧是 RTP 帧。开发了特殊的 VM 来执行解剖器和控制过程。

好消息是我有一些基于 Java 的剖析器的小型个人项目(我将跳过一些 javadoc 以节省几行代码)。

/**
 * High-level dissector contract definition. Dissector is meant to be a simple
 * protocol decoder, which analyzes protocol binary image and produces number
 * of fields.
 *
 * @author Renat.Gilmanov
 */
public interface Dissector {

    /**
     * Returns dissector type.
     */
    DissectorType getType();

    /**
     * Verifies packet data belongs to the protocol represented by this dissector.
     */
    boolean isProtocol(DataInput input, Dissection dissection);

    /**
     * Performs the dissection.
     */
    Dissection dissect(DataInput input, Dissection dissection);

    /**
     * Returns a protocol which corresponds to the current dissector.
     *
     * @return a protocol instance
     */
    Protocol getProtocol();
}

协议本身知道上层协议,因此当没有可用的直接提示时,可以遍历已知协议并使用isProtocol方法来识别下一帧。

public interface Protocol {

    // ...

    List<Protocol> getUpperProtocols(); }

正如我所说,RTP 协议处理起来有点棘手:

在此处输入图像描述

因此,让我们检查实现细节。验证基于有关协议的几个已知事实:

/**
 * Verifies current frame belongs to RTP protocol.
 *
 * @param input data input
 * @param dissection initial dissection
 * @return true if protocol frame is RTP
 */
@Override
public final boolean isProtocol(final DataInput input, final Dissection dissection) {
    int available = input.available();
    byte octet    = input.getByte();
    byte version  = getVersion(octet);
    byte octet2   = input.getByte(1);
    byte pt       = (byte) (octet2 & 0x7F);

    return ((pt < 0x47) & (RTP_VERSION == version));
}

解剖只是一组基本操作:

公共最终解剖解剖(数据输入输入,解剖d){

    // --- protocol header --------------------------------
    final byte octet1 = input.getByte(0);
    final byte version = getVersion(octet1);
    final byte p = (byte) ((octet1 & 0x20) >> 5);
    final byte x = (byte) ((octet1 & 0x10) >> 4);
    final byte cc = (byte) ((octet1 & 0x0F));

    //...

    // --- seq --------------------------------------------
    final int seq = (input.getInt() & 0x0000FFFF);
    final int timestamp = input.getInt();
    final int ssrc = input.getInt();

最后你可以定义一个协议栈:

public interface ProtocolStack {

    String getName();

    Protocol getRootProtocol();

    Dissection dissect(DataInput input, Dissection dissection, DissectOptions options);
}

在引擎盖下,它处理所有复杂性并逐帧解码数据包。最大的挑战是使解剖过程防弹且稳定。使用这种或类似的方法,您将能够组织您的协议解码代码。isProtocol的正确实现可能会让您处理不同的版本等。无论如何,我不会说这种方法很简单,但它提供了很大的灵活性和控制力。

3. 有没有通用的解决方案?

是的,有ASN.1

抽象语法符号一 (ASN.1) 是一种标准和符号,描述了在电信和计算机网络中表示、编码、传输和解码数据的规则和结构。形式化规则能够表示独立于机器特定编码技术的对象。形式表示法可以自动执行验证数据表示的特定实例是否遵守规范的任务。换言之,软件工具可用于验证。

以下是使用 ASN.1 定义的协议示例:

FooProtocol DEFINITIONS ::= BEGIN

    FooQuestion ::= SEQUENCE {
        trackingNumber INTEGER,
        question       IA5String
    }

    FooAnswer ::= SEQUENCE {
        questionNumber INTEGER,
        answer         BOOLEAN
    }

END

顺便说一句,有可用的Java Asn.1 编译器

如果您想 (1) 解析您的 asn1 文件 (2) 创建 .java 类和 (3) 编码/解码类的实例,JAC(Java Asn1 编译器)是一个工具。忘记所有 asn1 字节流,并利用 OOP!BER、CER 和 DER 均受支持。

最后

我通常建议做几个简单的 PoC 以找到可能的最佳解决方案。我决定不使用 ASN.1 以降低复杂性并留出一些优化空间,但它可能会对您有所帮助。

无论如何,尽你所能,让我们知道结果:)

您还可以查看以下主题:二进制和文本结构(数据包)的高效解码

4.更新:双向方法

很抱歉回答了很长的问题。我只是希望你有足够的选择来找到最好的解决方案。回答有关双向方法的问题:

  • 选项 1:您可以使用对称序列化方法:定义 DataOutput,编写序列化逻辑 - 大功告成。我只建议查看 BerkeleyDB API 和TupleBinding。它确实解决了同样的问题,提供了对存储/恢复过程的完全控制。

此类负责将条目与 TupleInput 和 TupleOutput 对象相互转换。它的两个抽象方法必须由一个具体的子类来实现,以在元组和键或数据对象之间进行转换。

entryToObject(TupleInput)
objectToEntry(Object,TupleOutput)
  • 选项 2:最通用的方法是定义包含一组字段的结构。每个字段都需要以下信息:
    • 姓名
    • 类型
    • 大小(位)

例如,对于 RTP,它将如下所示:

Version:          byte (2 bits)
Padding:          bool (1 bit)
Extension:        bool (1 bit)
CSRC Count:       byte (4 bits) 
Marker:           bool (1 bit)
Payload Type:     byte (7 bits)
Sequence Number:  int  (16 bits)

有了它,您可以定义读取/写入此类结构的通用方式。我知道的最接近的工作示例是Javolution Struct。请仔细阅读,他们有一个非常好的例子:

class Clock extends Struct { // Hardware clock mapped to memory.
     Unsigned16 seconds  = new Unsigned16(5); // unsigned short seconds:5 bits
     Unsigned16 minutes  = new Unsigned16(5); // unsigned short minutes:5 bits
     Unsigned16 hours    = new Unsigned16(4); // unsigned short hours:4 bits
     ...
 }
于 2013-08-22T21:10:11.883 回答
2

(注意:自从我使用 Java 已经有一段时间了,所以我用 C# 编写了这个,但你应该明白一般的想法)

总体思路是:

  1. 您的每个解析器基本上应该表示为一个接口,或一个委托(或方法,或函数指针),其签名如下:

    interface IParser<T>
    {   
         IParserResult<T> Parse(IIndexable<byte> input);
    }
    
  2. 解析操作的结果是IParserResult<T>接口的一个实例,它应该告诉您以下内容:

    • 解析是否成功,

    • 如果失败,为什么失败(没有足够的数据来完成解析,没有正确的解析器,或者 CRC 错误,或者解析时出现异常),

    • 如果成功,实际解析的消息值,

    • 如果成功,则下一个解析器偏移。

    换句话说,类似:

    interface IParserResult<T>
    {
         boot Success { get; } 
         ErrorType Error { get; } // in case it failed
         T Result { get; } // null if failed
         int BytesToSkip { get; } // if success, number of bytes to advance 
    }
    
  3. 您的解析器线程应该遍历解析器列表并检查结果。它应该或多或少像这样:

    // presuming inputFifo is a Queue<byte> 
    while (inputFifo.ContainsData) 
    {
         foreach (IParser parser in ListOfParsers) 
         {
             var result = parser.Parse(inputFifo);
    
             if (result.Success) 
             {
                 FireMessageReceived(result.Value);
                 inputFifo.Skip(result.BytesToSkip);
                 break;
             }
    
             // wrong parser? try the next one
             if (result.ErrorType == ErrorType.UnsupportedData)
             {
                 continue;
             }
    
             // otherwise handle errors
             switch (result.ErrorType) 
             {
                 ...
             }
         }
    }
    

IIndexable<byte>接口不是 .NET 的一部分,但它对于避免大量数组分配非常重要(这是CodeProject 文章)。

这种方法的好处是该Parse方法可以进行大量检查以确定它是否“支持”某个消息(检查 cookie、长度、crc 等)。我们在解析从不可靠连接在单独线程上不断接收的数据时使用这种方法,因此如果长度太短而无法判断消息是否有效,每个解析器也会返回“NotEnoughData”错误(在这种情况下循环中断并等待更多数据)。

[编辑]

此外(如果这对您也有帮助),我们使用“消息消费者”的列表(或准确地说是字典),这些“消息消费者”是强类型并与某个解析器/消息类型相关联。这样,在解析某个消息时,只会通知相关方。它基本上是一个简单的消息传递系统,您需要在其中创建解析器列表和映射字典(消息类型 -​​> 消息使用者)。

于 2013-08-19T12:12:22.073 回答
1

在 10,000 英尺高度,这是工厂模式有用的经典案例。如果您从工厂模式的角度考虑这个问题,您的代码会更干净(因此更容易优化)(我已经以另一种方式编写了它,所以不幸的是我知道 - 几天的工作减少到应用工厂模式后几个小时)。

[编辑...]

对于 bytes -> object 情况,您将需要读取足够多的字节来明确确定哪种类型的对象已通过网络传递,然后继续解析该对象序列化。

于 2013-08-21T17:23:48.010 回答
0

您创建一条消息(例如 myMessage),其中包含 x、y、z 的可选消息。这将在此处讨论。即这里是技术文档中 Foo、Barr、Baz 的示例

message OneMessage {
    // One of the following will be filled in.
    optional Foo foo = 1;
    optional Bar bar = 2;
    optional Baz baz = 3;
}
于 2013-08-16T13:25:04.847 回答
0

协议缓冲区基于二进制标签/值对定义自己的有线协议。如果您有一个无法更改的预先存在的协议,则不能使用 Protocol Buffers 来解析它。

于 2013-08-16T21:10:16.147 回答
0

我有一个想法:使用像JAXB这样的注释可以自动将消息对象转换为其定义的字节表示的过程。它还应该能够将原始字节重新创建/解组为消息对象(但仅在我猜的长度字段的场景中)。

但是使用注释将包括使用一些光反射。这最终可能会降低性能(?)。

这是一个示例(...当我编写示例时,我得出的结论是我可能可以使用 jaxb 注释。因为它们@XmlTypeAdapter还支持地图等。无论如何,这里是示例):

/**
 * Annotation that helps identifying data elements for encoding/decoding
 * byte packets.
 * 
 * Annotate public getter methods in message classes.
 * 
 * NOTE: just primitive types and strings supported.
 *
 */
@Retention (value = RetentionPolicy.RUNTIME)
@Target (value = ElementType.METHOD)
public @interface MessageAttribute
{
  /* relative position of attribute in byte packet
   * 0 = first, 1 = second, etc.
   */
  int position();

  /*
   * type of attribute
   */
  Class<?> type();
}
于 2013-08-23T10:56:47.397 回答
0

好吧,也许这没有帮助,但是:在使用非常相似的协议的 quake 中,算法是这样的(接收器/服务器已经知道玩家 ID)。

ByteBuffer frame; 

int first = frame.getInt(), magic = 0x4C444732;

if(  first != magic  )
   if( !player_list.containsKey(first) )  /* must be a "string" pdu (a chat)*/
      x = new StringPDU( frame );
   else                                  /* not a chat? must be a player_id */
      x = new PlayerNamePDU( frame ); 

else                                     /* starts with magic... a game startup pdu + playername
   x = new GamePDU( frame );             /*  maybe that's the player host, or must have at least
                                             one player */ 

每个 PDU 都有一个 readFrame 方法或一个从 ByteBuffer 读取字节的构造函数。它看起来很丑,但没有使用反射,是必要的。

class GamePDU extends PDU {

    byte  command; 
    short length; 
    byte  min_players;

    short time_to_start; 
    byte  num_players;  /// after this same as a player_name packet


    GamePDU( ByteBuffer b ) {
        command = b.readByte();
        length = b.readShort();
        min_players = b.readByte();
        time_to_start = b.readShort();
        num_players = b.readByte();
        // the rest of the frame is player name
        /// players_for_game.add( new PlayerPDU(b) );
        ///  this player is in the game_start pdu to ensure that the 
        //// player_list[ num_players ] has been allocated. and has a list head. ;) Whips!!
    }
    /** if the same code is reading/writing both ends, you don't have to worry about
        endianess or signededness. ;)
        In C, in parallel, some of the game code just whips!!!
       */
}

class PDU {}
class GamePDU  extends PDU {}
class PlayerNamePDU  extends PDU {}
class StringPDU  extends PDU {}
于 2013-08-24T08:20:54.613 回答