5

我正在尝试编写一个包装器同步方法async_read以允许在套接字上进行非阻塞读取。按照互联网上的几个示例,我开发了一个似乎几乎正确但不起作用的解决方案。

该类声明了这些相关的属性和方法:

class communications_client
{
    protected:
        boost::shared_ptr<boost::asio::io_service> _io_service;
        boost::shared_ptr<boost::asio::ip::tcp::socket> _socket;
        boost::array<boost::uint8_t, 128> _data;

        boost::mutex _mutex;
        bool _timeout_triggered;
        bool _message_received;
        boost::system::error_code _error;
        size_t _bytes_transferred;

        void handle_read(const boost::system::error_code & error, size_t bytes_transferred);
        void handle_timeout(const boost::system::error_code & error);
        size_t async_read_helper(unsigned short bytes_to_transfer, const boost::posix_time::time_duration & timeout, boost::system::error_code & error);

        ...
}

该方法async_read_helper是封装所有复杂性的方法,而其他两个handle_read只是handle_timeout事件处理程序。下面是这三种方法的实现:

void communications_client::handle_timeout(const boost::system::error_code & error)
{
    if (!error)
    {
        _mutex.lock();
        _timeout_triggered = true;
        _error.assign(boost::system::errc::timed_out, boost::system::system_category());
        _mutex.unlock();
    }
}

void communications_client::handle_read(const boost::system::error_code & error, size_t bytes_transferred)
{
    _mutex.lock();
    _message_received = true;
    _error = error;
    _bytes_transferred = bytes_transferred;
    _mutex.unlock();
}

size_t communications_client::async_read_helper(unsigned short bytes_to_transfer, const boost::posix_time::time_duration & timeout, boost::system::error_code & error)
{
    _timeout_triggered = false;
    _message_received = false;

    boost::asio::deadline_timer timer(*_io_service);
    timer.expires_from_now(timeout);
    timer.async_wait(
        boost::bind(
            &communications_client::handle_timeout,
            this,
            boost::asio::placeholders::error));

    boost::asio::async_read(
        *_socket,
        boost::asio::buffer(_data, 128),
        boost::asio::transfer_exactly(bytes_to_transfer),
        boost::bind(
            &communications_client::handle_read,
            this,
            boost::asio::placeholders::error,
            boost::asio::placeholders::bytes_transferred));

    while (true)
    {
        _io_service->poll_one();
        if (_message_received)
        {
            timer.cancel();
            break;
        }
        else if (_timeout_triggered)
        {
            _socket->cancel();
            break;
        }
    }

    return _bytes_transferred;
}

我的主要问题是:为什么这适用于循环_io_service->poll_one()而没有循环和调用_io_service->run_one()?另外,我想知道对于更习惯使用 Boost 和 Asio 的人来说,它是否正确。谢谢!


修复提案 #1

根据Jonathan Wakely所做的评论,可以在操作完成后使用_io_service->run_one()调用来替换循环。_io_service->reset()它应该看起来像:

_io_service->run_one();
if (_message_received)
{
    timer.cancel();
}
else if (_timeout_triggered)
{
    _socket->cancel();
}

_io_service->reset();

经过一些测试,我已经检查过这种解决方案本身是行不通的。使用错误代码连续调用该handle_timeout方法operation_aborted。如何停止这些呼叫?

修复提案 #2

twsansbury的答案是准确的,并且基于可靠的文档基础。该实现导致以下代码async_read_helper

while (_io_service->run_one())
{
    if (_message_received)
    {
        timer.cancel();
    }
    else if (_timeout_triggered)
    {
        _socket->cancel();
    }
}
_io_service->reset();

以及对方法的以下更改handle_read

void communications_client::handle_read(const boost::system::error_code & error, size_t bytes_transferred)
{
    if (error != boost::asio::error::operation_aborted)
    {
        ...
    }
}

该解决方案在测试过程中被证明是可靠和正确的。

4

1 回答 1

8

io_service::run_one()和之间的主要区别在于io_service::poll_one()它将run_one()阻塞直到处理程序准备好运行,而poll_one()不会等待任何未完成的处理程序准备好。

假设唯一未完成的处理程序_io_servicehandle_timeout()handle_read(),则run_one()不需要循环,因为它只会返回一次handle_timeout()或者handle_read()已经运行。另一方面,poll_one()需要一个循环,因为poll_one()将立即返回,因为既没有handle_timeout()也没有handle_read()准备好运行,导致函数最终返回。

原始代码以及修复建议 #1 的主要问题是async_read_helper()返回时 io_service 中仍有未完成的处理程序。在下一次调用 时async_read_helper(),要调用的下一个处理程序将是上一次调用的处理程序。该io_service::reset()方法只允许 io_service 从停止状态恢复运行,它不会删除任何已排队进入 io_service 的处理程序。要解决此问题,请尝试使用循环来使用 io_service 中的所有处理程序。使用完所有处理程序后,退出循环并重置 io_service:

// Consume all handlers.
while (_io_service->run_one())
{
  if (_message_received)
  {
    // Message received, so cancel the timer.  This will force the completion of
    // handle_timer, with boost::asio::error::operation_aborted as the error.
    timer.cancel();
  }
  else if (_timeout_triggered)
  {
    // Timeout occured, so cancel the socket.  This will force the completion of
    // handle_read, with boost::asio::error::operation_aborted as the error.
    _socket->cancel();
  }
}

// Reset service, guaranteeing it is in a good state for subsequent runs.
_io_service->reset();

从调用者的角度来看,这种形式的超时是同步的run_one()。但是,I/O 服务的工作仍在进行中。另一种方法是使用 Boost.Asio对 C++ 期货的支持来等待未来并执行超时。这段代码更容易阅读,但它至少需要一个其他线程来处理 I/O 服务,因为等待超时的线程不再处理 I/O 服务:

// Use an asynchronous operation so that it can be cancelled on timeout.
std::future<std::size_t> read_result = boost::asio::async_read(
    socket, buffer, boost::asio::use_future);

// If timeout occurs, then cancel the operation.
if (read_result.wait_for(std::chrono::seconds(1)) == 
    std::future_status::timeout)
{
  socket.cancel();
}
// Otherwise, the operation completed (with success or error).
else
{
  // If the operation failed, then on_read.get() will throw a
  // boost::system::system_error.
  auto bytes_transferred = read_result.get();
  // process buffer
}
于 2012-06-03T05:09:09.220 回答