18

我正在处理来自 spinn3r 的数据,它由多个不同的 protobuf 消息组成,这些消息序列化为一个字节流:

http://code.google.com/p/spinn3r-client/wiki/Protostream

“protostream 是协议缓冲区消息流,根据 Google 协议缓冲区规范在线上编码为长度前缀变量。流包含三个部分:标头、有效负载和尾部标记。”

这似乎是 protobuf 的一个非常标准的用例。事实上,protobuf 核心发行版为 C++ 和 Java 提供了 CodedInputStream。但是,protobuf 似乎没有为 python 提供这样的工具——“内部”工具不是为这种外部使用设置的:

https://groups.google.com/forum/?fromgroups#!topic/protobuf/xgmUqXVsK-o

所以......在我去拼凑一个python varint解析器和用于解析不同消息类型流的工具之前:有人知道这方面的任何工具吗?

为什么 protobuf 中缺少它?(或者我只是没有找到它?)

对于 protobuf 来说,这似乎是一个很大的差距,尤其是与 thrift 的“传输”和“协议”等效工具相比。我看对了吗?

4

3 回答 3

15

看起来另一个答案中的代码可能是从这里提取的。在使用此文件之前检查许可证,但我设法让它varint32使用如下代码读取 s:

import sys
import myprotocol_pb2 as proto
import varint # (this is the varint.py file)

data = open("filename.bin", "rb").read() # read file as string
decoder = varint.decodeVarint32          # get a varint32 decoder
                                         # others are available in varint.py

next_pos, pos = 0, 0
while pos < len(data):
    msg = proto.Msg()                    # your message type
    next_pos, pos = decoder(data, pos)
    msg.ParseFromString(data[pos:pos + next_pos])

    # use parsed message

    pos += next_pos
print "done!"

这是一个非常简单的代码,旨在加载由varint32s 分隔的单一类型的消息,它描述了下一条消息的大小。


更新:也可以使用以下方法直接从 protobuf 库中包含此文件:

from google.protobuf.internal.decoder import _DecodeVarint32
于 2014-02-14T07:08:59.380 回答
3

我已经实现了一个小型 python 包来将多个 protobuf 消息序列化到一个流中并从一个流中反序列化它们。您可以通过以下方式安装它pip

pip install pystream-protobuf

这是将两个 protobuf 消息列表写入文件的示例代码:

import stream

with stream.open("test.gam", "wb") as ostream:
    ostream.write(*objects_list)
    ostream.write(*another_objects_list)

然后vg_pb2.py从流中读取相同的消息(例如定义的对齐消息):

import stream
import vg_pb2

alns_list = []
with stream.open("test.gam", "rb") as istream:
    for data in istream:
        aln = vg_pb2.Alignment()
        aln.ParseFromString(data)
        alns_list.append(aln)
于 2016-08-01T13:47:39.410 回答
-3

这很简单,我可以理解为什么没有人费心制作可重用工具:

'''
Parses multiple protobuf messages from a stream of spinn3r data
'''

import sys
sys.path.append('python_proto/src')
import spinn3rApi_pb2
import protoStream_pb2

data = open('8mny44bs6tYqfnofg0ELPg.protostream').read()

def _VarintDecoder(mask):
    '''Like _VarintDecoder() but decodes signed values.'''

    local_ord = ord
    def DecodeVarint(buffer, pos):
        result = 0
        shift = 0
        while 1:
            b = local_ord(buffer[pos])
            result |= ((b & 0x7f) << shift)
            pos += 1
            if not (b & 0x80):
                if result > 0x7fffffffffffffff:
                    result -= (1 << 64)
                    result |= ~mask
                else:
                    result &= mask
                return (result, pos)
            shift += 7
            if shift >= 64:
                ## need to create (and also catch) this exception class...
                raise _DecodeError('Too many bytes when decoding varint.')
    return DecodeVarint

## get a 64bit varint decoder
decoder = _VarintDecoder((1<<64) - 1)

## get the three types of protobuf messages we expect to see
header    = protoStream_pb2.ProtoStreamHeader()
delimiter = protoStream_pb2.ProtoStreamDelimiter()
entry     = spinn3rApi_pb2.Entry()

## get the header
pos = 0
next_pos, pos = decoder(data, pos)
header.ParseFromString(data[pos:pos + next_pos])
## should check its contents

while 1:
    pos += next_pos
    next_pos, pos = decoder(data, pos)
    delimiter.ParseFromString(data[pos:pos + next_pos])

    if delimiter.delimiter_type == delimiter.END:
        break

    pos += next_pos
    next_pos, pos = decoder(data, pos)
    entry.ParseFromString(data[pos:pos + next_pos])
    print entry
于 2012-07-15T01:34:46.633 回答