这个邮件列表帖子描述了同样的问题。虽然CreateFile
withFILE_FLAG_OVERLAPPED
允许异步 I/O,但它并没有在 Boost.Asio 的上下文中将其建立为流。对于流,Boost.Asio 实现read_some
的read_some_at
偏移量始终为0
. ReadFile()
正如文档所述,这是问题的根源:
对于支持字节偏移的文件,您必须指定开始从文件读取的字节偏移。
适应类型要求
Boost.Asio 写得很通用,通常需要参数来满足某种类型的要求,而不是特定的类型。因此,通常可以调整 I/O 对象或其服务以获得所需的行为。首先,必须确定适配的接口需要支持什么。在这种情况下,async_read_until
接受任何满足AsyncReadStream
. AsyncReadStream
的要求是相当基本的,需要一个void async_read_some(MutableBufferSequence, ReadHandler)
成员函数。
由于需要在整个组合async_read_until
操作中跟踪偏移值,因此可以引入一个满足 ReadHandler 要求的简单类型,它将包装应用程序的 ReadHandler,并相应地更新偏移量。
namespace detail {
/// @brief Handler to wrap asynchronous read_some_at operations.
template <typename Handler>
class read_some_offset_handler
{
public:
read_some_offset_handler(Handler handler, boost::uint64_t& offset)
: handler_(handler),
offset_(offset)
{}
void operator()(
const boost::system::error_code& error,
std::size_t bytes_transferred)
{
offset_ += bytes_transferred;
// If bytes were transferred, then set the error code as success.
// EOF will be detected on next read. This is to account for
// the read_until algorithm behavior.
const boost::system::error_code result_ec =
(error && bytes_transferred)
? make_error_code(boost::system::errc::success) : error;
handler_(result_ec, bytes_transferred);
}
//private:
Handler handler_;
boost::uint64_t& offset_;
};
/// @brief Hook that allows the wrapped handler to be invoked
/// within specific context. This is critical to support
/// composed operations being invoked within a strand.
template <typename Function,
typename Handler>
void asio_handler_invoke(
Function function,
detail::read_some_offset_handler<Handler>* handler)
{
boost_asio_handler_invoke_helpers::invoke(
function, handler->handler_);
}
} // namespace detail
该asio_handler_invoke
钩子将通过 ADL 找到,以支持在适当的上下文中调用用户处理程序。当在strand
. 有关组合操作和链的更多详细信息,请参阅此答案。
以下类将boost::asio::windows::random_access_handle
适应AsyncReadStream
.
/// @brief Adapts AsyncRandomAccessReadDevice to support AsyncReadStream.
template <typename AsyncRandomAccessReadDevice>
class basic_adapted_stream
: public AsyncRandomAccessReadDevice
{
public:
basic_adapted_stream(
boost::asio::io_service& io_service,
HANDLE handle
)
: AsyncRandomAccessReadDevice(io_service, handle),
offset_(0)
{}
template<typename MutableBufferSequence,
typename ReadHandler>
void async_read_some(
const MutableBufferSequence& buffers,
ReadHandler handler)
{
async_read_at(*this, offset_, buffers,
detail::read_some_offset_handler<ReadHandler>(handler, offset_));
}
private:
boost::uint64_t offset_;
};
或者,boost::asio::windows::basic_stream_handle
可以提供满足StreamHandleService类型要求的自定义类型,并async_read_some
按照async_read_some_at
.
/// @brief Service that implements async_read_some with async_read_some_at.
class offset_stream_handle_service
: public boost::asio::windows::stream_handle_service
{
private:
// The type of the platform-specific implementation.
typedef boost::asio::detail::win_iocp_handle_service service_impl_type;
public:
/// The unique service identifier.
static boost::asio::io_service::id id;
/// Construct a new stream handle service for the specified io_service.
explicit offset_stream_handle_service(boost::asio::io_service& io_service)
: boost::asio::windows::stream_handle_service(io_service),
service_impl_(io_service),
offset_(0)
{}
/// Start an asynchronous read.
template <typename MutableBufferSequence,
typename ReadHandler>
void
async_read_some(
implementation_type& impl,
const MutableBufferSequence& buffers,
ReadHandler handler)
{
// Implement async_read_some in terms of async_read_some_at. The provided
// ReadHandler will be hoisted in an internal handler so that offset_ can
// be properly updated.
service_impl_.async_read_some_at(impl, offset_, buffers,
detail::read_some_offset_handler<ReadHandler>(handler, offset_));
}
private:
// The platform-specific implementation.
service_impl_type service_impl_;
boost::uint64_t offset_;
};
boost::asio::io_service::id offset_stream_handle_service::id;
我在示例代码中选择了简单性,但是多个 I/O 对象将使用相同的服务。因此,offset_stream_handle_service
当多个 I/O 对象使用该服务时,需要管理每个处理程序的偏移量以正常运行。
要使用适配的类型,请将AsyncReader::input_handle
成员变量修改为basic_adapted_stream<boost::asio::windows::random_access_handle>
(适配的 I/O 对象)或boost::asio::windows::basic_stream_handle<offset_stream_handle_service>
(适配的服务)。
例子
以下是基于原代码的完整示例,仅修改了AsyncReader::input_handler
's 类型:
#include "stdafx.h"
#include <cassert>
#include <iostream>
#include <string>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <Windows.h>
#include <WinBase.h>
namespace detail {
/// @brief Handler to wrap asynchronous read_some_at operations.
template <typename Handler>
class read_some_offset_handler
{
public:
read_some_offset_handler(Handler handler, boost::uint64_t& offset)
: handler_(handler),
offset_(offset)
{}
void operator()(
const boost::system::error_code& error,
std::size_t bytes_transferred)
{
offset_ += bytes_transferred;
// If bytes were transferred, then set the error code as success.
// EOF will be detected on next read. This is to account for
// the read_until algorithm behavior.
const boost::system::error_code result_ec =
(error && bytes_transferred)
? make_error_code(boost::system::errc::success) : error;
handler_(result_ec, bytes_transferred);
}
//private:
Handler handler_;
boost::uint64_t& offset_;
};
/// @brief Hook that allows the wrapped handler to be invoked
/// within specific context. This is critical to support
/// composed operations being invoked within a strand.
template <typename Function,
typename Handler>
void asio_handler_invoke(
Function function,
detail::read_some_offset_handler<Handler>* handler)
{
boost_asio_handler_invoke_helpers::invoke(
function, handler->handler_);
}
} // namespace detail
/// @brief Adapts AsyncRandomAccessReadDevice to support AsyncReadStream.
template <typename AsyncRandomAccessReadDevice>
class basic_adapted_stream
: public AsyncRandomAccessReadDevice
{
public:
basic_adapted_stream(
boost::asio::io_service& io_service,
HANDLE handle
)
: AsyncRandomAccessReadDevice(io_service, handle),
offset_(0)
{}
template<typename MutableBufferSequence,
typename ReadHandler>
void async_read_some(
const MutableBufferSequence& buffers,
ReadHandler handler)
{
async_read_at(*this, offset_, buffers,
detail::read_some_offset_handler<ReadHandler>(handler, offset_));
}
private:
boost::uint64_t offset_;
};
/// @brief Service that implements async_read_some with async_read_some_at.
class offset_stream_handle_service
: public boost::asio::windows::stream_handle_service
{
private:
// The type of the platform-specific implementation.
typedef boost::asio::detail::win_iocp_handle_service service_impl_type;
public:
/// The unique service identifier.
static boost::asio::io_service::id id;
/// Construct a new stream handle service for the specified io_service.
explicit offset_stream_handle_service(boost::asio::io_service& io_service)
: boost::asio::windows::stream_handle_service(io_service),
service_impl_(io_service),
offset_(0)
{}
/// Start an asynchronous read.
template <typename MutableBufferSequence,
typename ReadHandler>
void
async_read_some(
implementation_type& impl,
const MutableBufferSequence& buffers,
ReadHandler handler)
{
// Implement async_read_some in terms of async_read_some_at. The provided
// ReadHandler will be hoisted in an internal handler so that offset_ can
// be properly updated.
service_impl_.async_read_some_at(impl, offset_, buffers,
detail::read_some_offset_handler<ReadHandler>(handler, offset_));
}
private:
// The platform-specific implementation.
service_impl_type service_impl_;
boost::uint64_t offset_;
};
boost::asio::io_service::id offset_stream_handle_service::id;
#ifndef ADAPT_IO_SERVICE
typedef basic_adapted_stream<
boost::asio::windows::random_access_handle> adapted_stream;
#else
typedef boost::asio::windows::basic_stream_handle<
offset_stream_handle_service> adapted_stream;
#endif
class AsyncReader
{
public:
AsyncReader(boost::asio::io_service& io_service, HANDLE handle)
: io_service_(io_service),
input_buffer(/*size*/ 8192),
input_handle(io_service, handle)
{
start_read();
}
void start_read()
{
boost::asio::async_read_until(input_handle, input_buffer, '\n',
boost::bind(&AsyncReader::handle_read, this,
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred));
}
void handle_read(const boost::system::error_code& error, std::size_t length);
// void handle_write(const boost::system::error_code& error);
private:
boost::asio::io_service& io_service_;
boost::asio::streambuf input_buffer;
adapted_stream input_handle;
};
void AsyncReader::handle_read(const boost::system::error_code& error, std::size_t length)
{
if (!error)
{
static int count = 0;
++count;
// method 1: (same problem)
// const char* pStart = boost::asio::buffer_cast<const char*>(input_buffer.data());
// std::string s(pStart, length);
// input_buffer.consume(length);
// method 2:
std::istream is(&input_buffer);
std::string s;
assert(std::getline(is, s));
std::cout << "line #" << count << ", length " << length << ", getline() [" << s.size() << "] '" << s << "'\n";
start_read();
}
else if (error == boost::asio::error::not_found)
std::cerr << "Did not receive ending character!\n";
else
std::cerr << "Misc error during read!\n";
}
int _tmain(int argc, _TCHAR* argv[])
{
boost::asio::io_service io_service;
HANDLE handle = ::CreateFile(TEXT("c:/temp/input.txt"),
GENERIC_READ,
0, // share mode
NULL, // security attribute: NULL = default
OPEN_EXISTING, // creation disposition
FILE_FLAG_OVERLAPPED,
NULL // template file
);
AsyncReader obj(io_service, handle);
io_service.run();
std::cout << "Normal termination\n";
getchar();
return 0;
}
使用原始问题的输入时会产生以下输出:
第 1 行,长度 70,getline() [69] '第 1 行 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
第 2 行,长度 70,getline() [69] '第 2 行 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
第 3 行,长度 70,getline() [69] '第 3 行 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
第 4 行,长度 70,getline() [69] '第 4 行 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
第 5 行,长度 70,getline() [69] '第 5 行 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
第 6 行,长度 70,getline() [69] '第 6 行 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
第 7 行,长度 70,getline() [69] '第 7 行 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
第 8 行,长度 70,getline() [69] '第 8 行 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
第 9 行,长度 70,getline() [69] '第 9 行 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
第 10 行,长度 70,getline() [69] '第 0 行 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
第 11 行,长度 70,getline() [69] 'LINE A abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
第 12 行,长度 70,getline() [69] 'LINE B abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
第 13 行,长度 70,getline() [69] 'LINE C abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
第 14 行,长度 70,getline() [69] 'LINE D abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
第 15 行,长度 70,getline() [69] 'LINE E abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
读取过程中出现其他错误!
正常终止
\n
我的输入文件在 LINE F 的末尾没有字符。因此,AsyncReader::handle_read()
被调用时出现错误,boost::asio::error::eof
并且input_buffer
的内容包含 LINE F。修改最后的 else 情况以打印更多信息后:
...
else
{
std::cerr << "Error: " << error.message() << "\n";
if (std::size_t buffer_size = input_buffer.size())
{
boost::asio::streambuf::const_buffers_type bufs = input_buffer.data();
std::string contents(boost::asio::buffers_begin(bufs),
boost::asio::buffers_begin(bufs) + buffer_size);
std::cerr << "stream contents: '" << contents << "'\n";
}
}
我得到以下输出:
第 1 行,长度 70,getline() [69] '第 1 行 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
第 2 行,长度 70,getline() [69] '第 2 行 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
第 3 行,长度 70,getline() [69] '第 3 行 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
第 4 行,长度 70,getline() [69] '第 4 行 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
第 5 行,长度 70,getline() [69] '第 5 行 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
第 6 行,长度 70,getline() [69] '第 6 行 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
第 7 行,长度 70,getline() [69] '第 7 行 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
第 8 行,长度 70,getline() [69] '第 8 行 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
第 9 行,长度 70,getline() [69] '第 9 行 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
第 10 行,长度 70,getline() [69] '第 0 行 abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
第 11 行,长度 70,getline() [69] 'LINE A abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
第 12 行,长度 70,getline() [69] 'LINE B abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
第 13 行,长度 70,getline() [69] 'LINE C abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
第 14 行,长度 70,getline() [69] 'LINE D abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
第 15 行,长度 70,getline() [69] 'LINE E abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
错误:文件结束
流内容:'LINE F abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
正常终止