由于我对 C++ 的了解不足,我已经为此苦苦挣扎了两天。我需要做的是使用 protobuf C++ API 从一个大文件中解析消息序列,这个文件可能包含数百万条这样的消息。直接从文件中读取很容易,因为我总是可以执行“ReadVarInt32”来获取大小,然后执行 ParseFromCodedStream 并将限制推送到 CodedInputStream,如本文所述。但是,我正在使用的 I/O 级别 API(实际上是 libuv)需要为每个读取回调操作分配固定大小的缓冲区。显然,块大小与我正在读出的消息大小无关。
这让我的生活变得艰难。基本上每次我从文件中读取并填充固定大小的缓冲区(比如 16K)时,该缓冲区可能包含数百条完整的 protobuf 消息,但该缓冲区的最后一块可能是不完整的消息。所以我想,好吧,我应该做的是尝试尽可能多地读取消息,最后,提取最后一个块并将其附加到我读出的下一个 16K 缓冲区的开头,继续直到我达到 EOF文件。我使用 ReadVarInt32() 来获取大小,然后将该数字与缓冲区大小的其余部分进行比较,如果消息大小较小,则继续读取。
有一个名为GetDirectBufferPointer的 API,因此我在读出下一条消息的大小之前尝试使用它来记录指针位置。但是我怀疑由于字节顺序的怪异,如果我只是从指针开始的地方提取字节数组的其余部分并附加到下一个块,Parse 不会成功,实际上前几个字节(我认为是 8 个)完全搞砸了.
或者,如果我执行 codedStream.ReadRaw() 并将剩余流写入缓冲区,然后附加到新块的头部,则数据不会被破坏。但问题是这次我将丢失“大小”字节信息,因为它已经在“ReadVarInt32”中“读取”了!即使我只是继续记住上次读取的大小信息并直接调用下一次迭代 message.ParseFromCodedStream() ,它最终还是读取了一个字节,并且某些部分甚至损坏了,无法成功恢复对象。
std::vector<char> mCheckBuffer;
std::vector<char> mResidueBuffer;
char bResidueBuffer[READ_BUFFER_SIZE];
char temp[READ_BUFFER_SIZE];
google::protobuf::uint32 size;
//"in" is the file input stream
while (in.good()) {
in.read(mReadBuffer.data(), READ_BUFFER_SIZE);
mCheckBuffer.clear();
//merge the last remaining chunk that contains incomplete message with
//the new data chunk I got out from buffer. Excuse my terrible C++ foo
std::merge(mResidueBuffer.begin(), mResidueBuffer.end(),
mReadBuffer.begin(), mReadBuffer.end(), std::back_inserter(mCheckBuffer));
//Treat the new merged buffer array as the new CIS
google::protobuf::io::ArrayInputStream ais(&mCheckBuffer[0],
mCheckBuffer.size());
google::protobuf::io::CodedInputStream cis(&ais);
//Record the pointer location on CIS in bResidueBuffer
cis.GetDirectBufferPointer((const void**)&bResidueBuffer,
&bResidueBufSize);
//No size information, probably first time or last iteration
//coincidentally read a complete message out. Otherwise I simply
//skip reading size again as I've already populated that from last
//iteration when I got an incomplete message
if(size == 0) {
cis.ReadVarint32(&size);
}
//Have to read this again to get remaining buffer size
cis.GetDirectBufferPointer((const void**)&temp, &mResidueBufSize);
//Compare the next message size with how much left in the buffer, if
//message size is smaller, I know I can read at least one more message
//out, keep reading until I run out of buffer, or, it's the end of message
//and my buffer just allocated larger so size should be 0
while (size <= mResidueBufSize && size != 0) {
//If this cis I constructed didn't have the size info at the beginning,
//and I just read straight from it hoping to get the message out from
//the "size" I got from last iteration, it simply doesn't work
//(read one less byte in fact, and some part of the message corrupted)
//push the size constraint to the input stream;
int limit = cis.PushLimit(size);
//parse message from the input stream
message.ParseFromCodedStream(&cis);
cis.PopLimit(limit);
google::protobuf::TextFormat::PrintToString(message, &str);
printf("%s", str.c_str());
//do something with the parsed object
//Now I have to record the new pointer location again
cis.GetDirectBufferPointer((const void**)&bResidueBuffer,
&bResidueBufSize);
//Read another time the next message's size and go back to while loop check
cis.ReadVarint32(&size);
}
//If I do the next line, bResidueBuffer will have the correct CIS information
//copied over, but not having the "already read" size info
cis.ReadRaw(bResidueBuffer, bResidueBufSize);
mResidueBuffer.clear();
//I am constructing a new vector that receives the residual chunk of the
//current buffer that isn't enough to restore a message
//If I don't do ReadRaw, this copy completely messes up at least the first 8
//bytes of the copied buffer's value, due to I suspect endianness
mResidueBuffer.insert(mResidueBuffer.end(), &bResidueBuffer[0],
&bResidueBuffer[bResidueBufSize]);
}
我现在真的很想不通。是否可以优雅地使用 protobuf 和需要固定大小的中间缓冲区的 API?非常感谢任何输入,谢谢!