1

由于我对 C++ 的了解不足,我已经为此苦苦挣扎了两天。我需要做的是使用 protobuf C++ API 从一个大文件中解析消息序列,这个文件可能包含数百万条这样的消息。直接从文件中读取很容易,因为我总是可以执行“ReadVarInt32”来获取大小,然后执行 ParseFromCodedStream 并将限制推送到 CodedInputStream,如本文所述。但是,我正在使用的 I/O 级别 API(实际上是 libuv)需要为每个读取回调操作分配固定大小的缓冲区。显然,块大小与我正在读出的消息大小无关。

这让我的生活变得艰难。基本上每次我从文件中读取并填充固定大小的缓冲区(比如 16K)时,该缓冲区可能包含数百条完整的 protobuf 消息,但该缓冲区的最后一块可能是不完整的消息。所以我想,好吧,我应该做的是尝试尽可能多地读取消息,最后,提取最后一个块并将其附加到我读出的下一个 16K 缓冲区的开头,继续直到我达到 EOF文件。我使用 ReadVarInt32() 来获取大小,然后将该数字与缓冲区大小的其余部分进行比较,如果消息大小较小,则继续读取。

有一个名为GetDirectBufferPointer的 API,因此我在读出下一条消息的大小之前尝试使用它来记录指针位置。但是我怀疑由于字节顺序的怪异,如果我只是从指针开始的地方提取字节数组的其余部分并附加到下一个块,Parse 不会成功,实际上前几个字节(我认为是 8 个)完全搞砸了.

或者,如果我执行 codedStream.ReadRaw() 并将剩余流写入缓冲区,然后附加到新块的头部,则数据不会被破坏。但问题是这次我将丢失“大小”字节信息,因为它已经在“ReadVarInt32”中“读取”了!即使我只是继续记住上次读取的大小信息并直接调用下一次迭代 message.ParseFromCodedStream() ,它最终还是读取了一个字节,并且某些部分甚至损坏了,无法成功恢复对象。

std::vector<char> mCheckBuffer;
std::vector<char> mResidueBuffer;
char bResidueBuffer[READ_BUFFER_SIZE];
char temp[READ_BUFFER_SIZE];
google::protobuf::uint32 size;
//"in" is the file input stream
while (in.good()) {
    in.read(mReadBuffer.data(), READ_BUFFER_SIZE);
    mCheckBuffer.clear();
    //merge the last remaining chunk that contains incomplete message with
    //the new data chunk I got out from buffer. Excuse my terrible C++ foo
    std::merge(mResidueBuffer.begin(), mResidueBuffer.end(),  
    mReadBuffer.begin(), mReadBuffer.end(), std::back_inserter(mCheckBuffer));

    //Treat the new merged buffer array as the new CIS
    google::protobuf::io::ArrayInputStream ais(&mCheckBuffer[0], 
    mCheckBuffer.size());
    google::protobuf::io::CodedInputStream cis(&ais);
    //Record the pointer location on CIS in bResidueBuffer
    cis.GetDirectBufferPointer((const void**)&bResidueBuffer,
    &bResidueBufSize);

    //No size information, probably first time or last iteration  
    //coincidentally read a complete message out. Otherwise I simply 
    //skip reading size again as I've already populated that from last 
    //iteration when I got an incomplete message
    if(size == 0) {
         cis.ReadVarint32(&size);
    }
    //Have to read this again to get remaining buffer size
    cis.GetDirectBufferPointer((const void**)&temp, &mResidueBufSize);

    //Compare the next message size with how much left in the buffer, if      
    //message size is smaller, I know I can read at least one more message 
    //out, keep reading until I run out of buffer, or, it's the end of message 
    //and my buffer just allocated larger so size should be 0
    while (size <= mResidueBufSize && size != 0) {
        //If this cis I constructed didn't have the size info at the beginning, 
        //and I just read straight from it hoping to get the message out from 
        //the "size" I got from last iteration, it simply doesn't work
        //(read one less byte in fact, and some part of the message corrupted)
        //push the size constraint to the input stream;
        int limit = cis.PushLimit(size);
        //parse message from the input stream
        message.ParseFromCodedStream(&cis);  
        cis.PopLimit(limit);
        google::protobuf::TextFormat::PrintToString(message, &str);
        printf("%s", str.c_str());
        //do something with the parsed object
        //Now I have to record the new pointer location again
        cis.GetDirectBufferPointer((const void**)&bResidueBuffer, 
        &bResidueBufSize);
        //Read another time the next message's size and go back to while loop check
        cis.ReadVarint32(&size);

    }
    //If I do the next line, bResidueBuffer will have the correct CIS information 
    //copied over, but not having the "already read" size info
    cis.ReadRaw(bResidueBuffer, bResidueBufSize);
    mResidueBuffer.clear();
    //I am constructing a new vector that receives the residual chunk of the 
    //current buffer that isn't enough to restore a message
    //If I don't do ReadRaw, this copy completely messes up at least the first 8 
    //bytes of the copied buffer's value, due to I suspect endianness
    mResidueBuffer.insert(mResidueBuffer.end(), &bResidueBuffer[0], 
    &bResidueBuffer[bResidueBufSize]);
}

我现在真的很想不通。是否可以优雅地使用 protobuf 和需要固定大小的中间缓冲区的 API?非常感谢任何输入,谢谢!

4

2 回答 2

1

我发现您的代码存在两个主要问题:

std::merge(mResidueBuffer.begin(), mResidueBuffer.end(),  
mReadBuffer.begin(), mReadBuffer.end(), std::back_inserter(mCheckBuffer));

看起来您希望std::merge连接缓冲区,但实际上此函数执行将两个排序数组合并为一个合并排序意义上的排序数组。在这种情况下,这没有任何意义。mCheckBuffer 最终会包含废话。

cis.GetDirectBufferPointer((const void**)&bResidueBuffer,
&bResidueBufSize);

在这里,您正在转换&bResidueBuffer为不兼容的指针类型。bResidueBuffer是一个 char 数组,所以&bResidueBuffer是一个指向 char 数组的指针,它不是指向指针的指针。这无疑是令人困惑的,因为数组可以隐式转换为指针(指针指向数组的第一个元素),但这实际上是一种转换——bResidueBuffer它本身不是指针,它只能转换为一个指针。

我想你也误解了什么GetDirectBufferPointer()。看起来您希望它将缓冲区的其余部分复制到 中bResidueBuffer,但该方法从不复制任何数据。该方法返回一个指向原始缓冲区的指针。

调用它的正确方法是:

const void* ptr;
int size;
cis.GetDirectBufferPointer(&ptr, &size);

现在ptr将指向原始缓冲区。您现在可以将其与指向缓冲区开头的指针进行比较,以找出您在流中的位置,例如:

size_t pos = (const char*)ptr - &mCheckBuffer[0];

但是,你不应该那样做,因为CodedInputStream已经有了CurrentPosition()用于这个目的的方法。这将返回缓冲区中的当前字节偏移量。因此,请改用它。

于 2015-03-20T07:46:46.543 回答
0

好的,感谢 Kenton 帮助指出我的问题中的主要问题,我现在已经修改了代码片段并测试了它的工作原理。我将在这里发布我的解决方案。然而,话虽如此,我对我需要在这里做的所有复杂性和边缘情况检查感到不高兴。我认为这很容易出错。即使这样,我可能会真正做的是在我的 libuv 主线程之外的另一个线程中编写我的直接“从流中读取”阻塞调用,这样我就不需要使用 libuv API。但为了完整起见,这是我的代码:

std::vector<char> mCheckBuffer;
std::vector<char> mResidueBuffer;
std::vector<char> mReadBuffer(READ_BUFFER_SIZE);
google::protobuf::uint32 size;
//"in" is the file input stream
while (in.good()) {
    //This part is tricky as you're not guaranteed that what end up in 
    //mReadBuffer is everything you read out from the file. The same 
    //happens with libuv's assigned buffer, after EOF, what's rest in 
    //the buffer could be anything
    in.read(mReadBuffer.data(), READ_BUFFER_SIZE);
    //merge the last remaining chunk that contains incomplete message with
    //the new data chunk I got out from buffer. I couldn't find a more 
    //efficient way doing that
    mCheckBuffer.clear();
    mCheckBuffer.reserve(mResidueBuffer.size() + mReadBuffer.size());
    mCheckBuffer.insert(mCheckBuffer.end(), mResidueBuffer.begin(),
    mResidueBuffer.end());
    mCheckBuffer.insert(mCheckBuffer.end(), mReadBuffer.begin(),
    mReadBuffer.end());
    //Treat the new merged buffer array as the new CIS
    google::protobuf::io::ArrayInputStream ais(&mCheckBuffer[0], 
    mCheckBuffer.size());
    google::protobuf::io::CodedInputStream cis(&ais);
    //No size information, probably first time or last iteration  
    //coincidentally read a complete message out. Otherwise I simply 
    //skip reading size again as I've already populated that from last 
    //iteration when I got an incomplete message
    if(size == 0) {
        cis.ReadVarint32(&size);
    }
    bResidueBufSize = mCheckBuffer.size() - cis.CurrentPosition();
    //Compare the next message size with how much left in the buffer, if      
    //message size is smaller, I know I can read at least one more message 
    //out, keep reading until I run out of buffer. If, it's the end of message 
    //and size (next byte I read from stream) happens to be 0, that
    //will trip me up, cos when I push size 0 into PushLimit and then try 
    //parsing, it will actually return true even if it reads nothing. 
    //So I can get into an infinite loop, if I don't do the check here
    while (size <= bResidueBufSize && size != 0) {
        //If this cis I constructed didn't have the size info at the 
        //beginning, and I just read straight from it hoping to get the  
        //message out from the "size" I got from last iteration
        //push the size constraint to the input stream
        int limit = cis.PushLimit(size); 
        //parse the message from the input stream
        bool result = message.ParseFromCodedStream(&cis);  
        //Parse fail, it could be because last iteration already took care
        //of the last message and that size I read last time is just junk
        //I choose to only check EOF here when result is not true, (which
        //leads me to having to check for size=0 case above), cos it will
        //be too many checks if I check it everytime I finish reading a 
        //message out
        if(!result) {
            if(in.eof()) {
                log.info("Reached EOF, stop processing!");
                break;
            }
            else {
                log.error("Read error or input mal-formatted! Log error!");
                exit;
            }
        }
        cis.PopLimit(limit);
        google::protobuf::TextFormat::PrintToString(message, &str);
        //Do something with the message

        //This is when the last message read out exactly reach the end of 
        //the buffer and there is no size information available on the 
        //stream any more, in which case size will need to be reset to zero
        //so that the beginning of next iteration will read size info first
        if(!cis.ReadVarint32(&size)) {
            size = 0;
        }
        bResidueBufSize = mCheckBuffer.size() - cis.CurrentPosition();
    }
    if(in.eof()) {
        break;
    }
    //Now I am copying the residual buffer into the intermediate
    //mResidueBuffer, which will be merged with newly read data in next iteration
    mResidueBuffer.clear();
    mResidueBuffer.reserve(bResidueBufSize);
    mResidueBuffer.insert(mResidueBuffer.end(), 
    &mCheckBuffer[cis.CurrentPosition()],&mCheckBuffer[mCheckBuffer.size()]);
}
if(!in.eof()) {
    log.error("Something else other than EOF happened to the file, log error!");
    exit;
}
于 2015-03-21T00:42:53.540 回答