3

我正在使用 boost::asio 编写服务器应用程序。
我正在从用户读取已知数量的数据并将其写入具有以下 api 的数据接收器:

class Sink {
    ...
    void write(const char *data, size_t size);
}

数据量大,可以write(..)多次调用处理一个流。
在我的代码中,我想调用:

boost::asio::async_read(socket, sink_buffer,
    boost::asio::transfer_exactly(size), ...);

Sink是否可以用自定义包装,std::streambuf或者boost::asio::basic_streambuf它可以处理向其写入数据部分?

4

1 回答 1

1

对于作为 的缓冲区参数传递的对象async_read()缓冲区参数要么:

  • 需要是满足MutableBufferSequence要求的类型。
  • 成为一个boost::asio::basic_streambuf对象。

因此,可以编写一个Sink在读取发生时与对象交互的自定义​​类。但是,boost::asio::basic_streambuf似乎并没有被设计为用作基类。

如果Sink::write只是对底层内存的抽象,请考虑使用类似于 的方法basic_streambuf::prepare(),其中成员函数返回给定大小的缓冲区的句柄。底层内存实现仍将在mutable_buffer. 例如:

boost::asio::async_read( socket, sink.buffer( size ), ... );

如果Sink::write有业务逻辑,比如根据某些字节的值进行逻辑分支,那么可能需要传递一个中间缓冲区给async_read(). Sink::write()然后将在async_read()处理程序中完成对中间缓冲区的调用。例如:

void handle_read_into_sink( boost::system::error_code error,
                            std::size_t bytes_transferred,
                            boost::asio::ip::tcp::socket& socket,
                            Sink& sink,
                            char* buffer,
                            std::size_t buffer_size,
                            std::size_t bytes_remaining,
                            boost::function< void() > on_finish )
{
  sink.write( buffer, buffer_size );

  bytes_remaining -= bytes_transferred;
  // If there are more bytes remaining, then continue reading.
  if ( bytes_remaining )
  {
    read_into_sink( socket, sink, buffer, buffer_size,
                    bytes_remaining, on_finish );
  }
  // Otherwise, all data has been read.
  else
  {
    on_finish();
  }  
}

void read_into_sink( boost::asio::ip::tcp::socket& socket,
                     Sink& sink,
                     char* buffer,
                     std::size_t buffer_size,
                     std::size_t bytes_remaining,
                     boost::function< void() > on_finish )
{
  boost::asio::async_read(
    socket, boost::asio::buffer( buffer , buffer_size ),
    boost::asio::transfer_exactly( buffer_size ),
    boost::bind( handle_read_into_sink,
                 boost::asio::placeholders::error,
                 boost::asio::placeholders::bytes_transferred,
                 boost::ref( socket ),
                 boost::ref( sink ),
                 buffer,
                 buffer_size,
                 bytes_remaining,
                 on_finish ) );
}

并开始异步读取循环:

read_into_sink( socket, sink, small_buffer, sizeof_small_buffer, 
                total_stream_size, read_handler_callback );

确保根据您所需的逻辑检查和处理错误。

于 2012-06-20T16:07:17.467 回答