16

我的问题如下。我异步启动了几个操作,我想继续直到所有操作都完成。使用 Boost Asio,最直接的方法如下。假设tasks是某种支持异步操作的对象容器。

tasksToGo = tasks.size();
for (auto task: tasks) {
    task.async_do_something([](const boost::system::error_code& ec)
    {
        if (ec) {
            // handle error
        } else {
            if (--taslsToGo == 0) {
                 tasksFinished();
            }
        }
    });
}

这个解决方案的问题在于它感觉像是一种解决方法。在 Boost 1.54 中,我可以使用期货来做到这一点,但我只能同步等待,这只能从与run()调用位置分开的线程中实现。

for (auto task: tasks) {
    futures.push_back(task.async_do_something(boost::asio::use_future));
}

for (auto future: futures) {
    future.wait();
}

这段代码比前一个更清晰,但我需要一个我不想要的单独线程。我想要可以像这样使用的东西:

for (auto task: tasks) {
    futures.push_back(task.async_do_something(boost::asio::use_future));
}

boost::asio::spawn(ioService, [](boost::asio::yield_context yield)
{
    for (auto future: futures) {
        future.async_wait(yield);
    }
    tasksFinished();

}

有什么可以类似使用的吗?

4

2 回答 2

20

据我所知,目前还没有一流的支持。但是,考虑到库的方向,如果将来无法使用此功能,我会感到惊讶。

已经提出了一些论文来增加对此类功能的支持:

  • N3558 - 异步操作的标准化表示特别有趣。它提出when_all(futures)future.next()。如果实现了,则可以将异步链表示为:

    for (auto task: tasks) {
        futures.push_back(task.async_do_something(boost::asio::use_future));
    }
    when_all(futures).then(&tasksFinished);
    
  • N3562 - 执行器和调度器引入了执行器。这可用于对 anasync可以执行的上下文提供更好的控制。对于 Boost.Asio,这可能需要提供某种类型的执行器,它遵循io_service.

虽然这些论文仍在进行中,但定期检查 Boost.Thread 的一致性和扩展页面和 Boost.Asio 的github以了解这些建议的早期改编可能是值得的。


一年前,我需要使用更早版本的 Boost 来实现此功能,因此我开发了自己的解决方案。在语义方面仍有一些粗糙的地方,但在正式采用之前作为参考资料可能会有所帮助。在我提供代码之前,这是一个基于您的问题的示例应用程序:

#include <iostream>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include "async_ops.hpp"

void handle_timer(const boost::system::error_code& error, int x)
{
  std::cout << "in handle timer: " << x << " : "
            << error.message() <<  std::endl;
}

void a() { std::cout << "a" << std::endl; }
void b() { std::cout << "b" << std::endl; }
void c() { std::cout << "c" << std::endl; }

int main()
{
  boost::asio::io_service io_service;
  boost::asio::deadline_timer timer1(io_service);
  boost::asio::deadline_timer timer2(io_service);

  // Create a chain that will continue once 2 handlers have been executed.
  chain all_expired = when_all(io_service, 2);

  all_expired.then(&a)  // Once 2 handlers finish, run a within io_service.
             .then(&b)  // Once a has finished, run b within io_service.
             .then(&c); // Once b has finished, run c within io_service.

  // Set expiration times for timers.
  timer1.expires_from_now(boost::posix_time::seconds(2));
  timer2.expires_from_now(boost::posix_time::seconds(5));

  // Asynchrnously wait for the timers, wrapping the handlers with the chain.
  timer1.async_wait(all_expired.wrap(
      boost::bind(&handle_timer, boost::asio::placeholders::error, 1)));
  timer2.async_wait(all_expired.wrap(
      boost::bind(&handle_timer, boost::asio::placeholders::error, 2)));

  // Run the io_service.
  io_service.run();
}

产生以下输出:

in handle timer: 1 : Success
in handle timer: 2 : Success
a
b
c

这是async_ops.hpp

#include <vector>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/bind/protect.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/foreach.hpp>
#include <boost/function.hpp>
#include <boost/make_shared.hpp>
#include <boost/range/iterator_range.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread/locks.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/type_traits/is_integral.hpp>
#include <boost/type_traits/remove_reference.hpp>
#include <boost/utility/enable_if.hpp>

class chain;

namespace detail {

/// @brief Chained handler connects two handlers together that will
///        be called sequentially.
///
/// @note Type erasure is not performed on Handler1 to allow resolving
///       to the correct asio_handler_invoke via ADL.
template <typename Handler1> 
class chained_handler
{
public:
  template <typename Handler2>
  chained_handler(Handler1 handler1, Handler2 handler2)
    : handler1_(handler1),
      handler2_(handler2)
  {}

  void operator()()
  {
    handler1_();
    handler2_();
  }

  template <typename Arg1>
  void operator()(const Arg1& a1)
  {
    handler1_(a1);
    handler2_();
  }

  template <typename Arg1, typename Arg2>
  void operator()(const Arg1& a1, const Arg2& a2)
  {
    handler1_(a1, a2);
    handler2_();
  }

//private:
  Handler1 handler1_;
  boost::function<void()> handler2_;
};

/// @brief Hook that allows the sequential_handler to be invoked
///        within specific context based on the hander's type.
template <typename Function,
          typename Handler>
void asio_handler_invoke(
  Function function,
  chained_handler<Handler>* handler)
{
  boost_asio_handler_invoke_helpers::invoke(
    function, handler->handler1_);
}

/// @brief No operation.
void noop() {}

/// @brief io_service_executor is used to wrap handlers, providing a
///        deferred posting to an io_service.  This allows for chains
///        to inherit io_services from other chains.
class io_service_executor
  : public boost::enable_shared_from_this<io_service_executor>
{
public:
  /// @brief Constructor.
  explicit
  io_service_executor(boost::asio::io_service* io_service)
    : io_service_(io_service)
  {}

  /// @brief Wrap a handler, returning a functor that will post the
  ///        provided handler into the io_service.
  ///
  /// @param handler Handler to be wrapped for deferred posting.
  /// @return Functor that will post handler into io_service.
  template <typename Handler>
  boost::function<void()> wrap(Handler handler)
  {
    // By binding to the io_service_exectuer's post, the io_service
    // into which the handler can be posted can be specified at a later 
    // point in time.
    return boost::bind(&io_service_executor::post<Handler>,
                       shared_from_this(), handler);
  }

  /// @brief Set the io_service.
  void io_service(boost::asio::io_service* io_service)
  {
    io_service_ = io_service;
  }

  /// @brief Get the io_service.
  boost::asio::io_service* io_service()
  {
    return io_service_;
  }

private:

  /// @brief Post handler into the io_service.
  ///
  /// @param handler The handler to post.
  template <typename Handler>
  void post(Handler handler)
  {
    io_service_->post(handler);
  }

private:
  boost::asio::io_service* io_service_;
};

/// @brief chain_impl is an implementation for a chain.  It is responsible
///        for lifetime management, tracking posting and wrapped functions,
///        as well as determining when run criteria has been satisfied.
class chain_impl
  : public boost::enable_shared_from_this<chain_impl>
{
public:

  /// @brief Constructor.
  chain_impl(boost::shared_ptr<io_service_executor> executor,
             std::size_t required)
    : executor_(executor),
      required_(required)
  {}

  /// @brief Destructor will invoke all posted handlers.
  ~chain_impl()
  {
    run();
  }

  /// @brief Post a handler that will be posted into the executor 
  ///        after run criteria has been satisfied.
  template <typename Handler>
  void post(const Handler& handler)
  {
    deferred_handlers_.push_back(executor_->wrap(handler));
  }

  /// @brief Wrap a handler, returning a chained_handler.  The returned
  ///        handler will notify the impl when it has been invoked.
  template <typename Handler>
  chained_handler<Handler> wrap(const Handler& handler)
  {
    return chained_handler<Handler>(
      handler,                                                 // handler1
      boost::bind(&chain_impl::complete, shared_from_this())); // handler2
  }

  /// @brief Force run of posted handlers.
  void run()
  {
    boost::unique_lock<boost::mutex> guard(mutex_);
    run(guard);
  }

  /// @brief Get the executor.
  boost::shared_ptr<io_service_executor> executor() { return executor_; }

private:

  /// @brief Completion handler invoked when a wrapped handler has been
  ///        invoked.
  void complete()
  {
    boost::unique_lock<boost::mutex> guard(mutex_);

    // Update tracking.
    if (required_)
      --required_;

    // If criteria has not been met, then return early.
    if (required_) return;

    // Otherwise, run the handlers.
    run(guard);    
  }

  /// @brief Run handlers.
  void run(boost::unique_lock<boost::mutex>& guard)
  {
    // While locked, swap handlers into a temporary.
    std::vector<boost::function<void()> > handlers;
    using std::swap;
    swap(handlers, deferred_handlers_);

    // Run handlers without mutex.
    guard.unlock();
    BOOST_FOREACH(boost::function<void()>& handler, handlers)
        handler();
    guard.lock();
  }

private:
  boost::shared_ptr<io_service_executor> executor_;
  boost::mutex mutex_;
  std::size_t required_;
  std::vector<boost::function<void()> > deferred_handlers_;
};

/// @brief Functor used to wrap and post handlers or chains between two
///        implementations.
struct wrap_and_post
{
  wrap_and_post(
    boost::shared_ptr<detail::chain_impl> current,
    boost::shared_ptr<detail::chain_impl> next
  )
    : current_(current),
      next_(next)
  {}

  /// @brief Wrap a handler with next, then post into current.
  template <typename Handler>
  void operator()(Handler handler)
  {
    // Wrap the handler with the next implementation, then post into the
    // current.  The wrapped handler will keep next alive, and posting into
    // current will cause next::complete to be invoked when current is ran.
    current_->post(next_->wrap(handler));
  }

  /// @brief Wrap an entire chain, posting into the current.
  void operator()(chain chain);

private:
  boost::shared_ptr<detail::chain_impl> current_;
  boost::shared_ptr<detail::chain_impl> next_;
};

} // namespace detail

/// @brief Used to indicate that the a chain will inherit its service from an
///        outer chain.
class inherit_service_type {};
inherit_service_type inherit_service;

/// @brief Chain represents an asynchronous call chain, allowing the overall
///        chain to be constructed in a verbose and explicit manner.
class chain
{
public:

  /// @brief Constructor.
  ///
  /// @param io_service The io_service in which the chain will run.
  explicit
  chain(boost::asio::io_service& io_service)
    : impl_(boost::make_shared<detail::chain_impl>(
              boost::make_shared<detail::io_service_executor>(&io_service),
              0)),
      root_impl_(impl_)
  {}

  /// @brief Constructor.  The chain will inherit its io_service from an
  ///        outer chain.
  explicit
  chain(inherit_service_type)
    : impl_(boost::make_shared<detail::chain_impl>(
              boost::make_shared<detail::io_service_executor>(
                static_cast<boost::asio::io_service*>(NULL)),
              0)),
      root_impl_(impl_)
  {}

  /// @brief Force run posted handlers.
  void run()
  {
    root_impl_->run();
  }

  /// @brief Chain link that will complete when the amount of wrapped
  ///        handlers is equal to required.
  ///
  /// @param required The amount of handlers required to be complete.
  template <typename T>
  typename boost::enable_if<boost::is_integral<
    typename boost::remove_reference<T>::type>, chain>::type
  any(std::size_t required = 1)
  {
    return chain(root_impl_, required);
  }

  /// @brief Chain link that wraps all handlers in container, and will
  ///        be complete when the amount of wrapped handlers is equal to
  ///        required.
  ///
  /// @param Container of handlers to wrap.
  /// @param required The amount of handlers required to be complete.
  template <typename Container>
  typename boost::disable_if<boost::is_integral<
    typename boost::remove_reference<Container>::type>, chain>::type
  any(const Container& container, 
      std::size_t required = 1)
  {
    return post(container, required);
  }

  /// @brief Chain link that wraps all handlers in iterator range, and will
  ///        be complete when the amount of wrapped handlers is equal to
  ///        required.
  ///
  /// @param Container of handlers to wrap.
  /// @param required The amount of handlers required to be complete.
  template <typename Iterator>
  chain any(Iterator begin, Iterator end,
            std::size_t required = 1)
  {
    return any(boost::make_iterator_range(begin, end), required);
  }

  /// @brief Chain link that will complete when the amount of wrapped
  ///        handlers is equal to required.
  ///
  /// @param required The amount of handlers required to be complete.
  template <typename T>
  typename boost::enable_if<boost::is_integral<
    typename boost::remove_reference<T>::type>, chain>::type
  all(T required)
  {
    return any<T>(required);
  }

  /// @brief Chain link that wraps all handlers in container, and will
  ///        be complete when all wrapped handlers from the container 
  ///        have been executed.
  ///
  /// @param Container of handlers to wrap.
  template <typename Container>
  typename boost::disable_if<boost::is_integral<
    typename boost::remove_reference<Container>::type>, chain>::type
  all(const Container& container)
  {
    return any(container, container.size());
  }

  /// @brief Chain link that wraps all handlers in iterator range, and will
  ///        be complete when all wrapped handlers from the iterator range
  ///        have been executed.
  ///
  /// @param Container of handlers to wrap.
  template <typename Iterator>
  chain all(Iterator begin, Iterator end)
  {
    return all(boost::make_iterator_range(begin, end));
  }

  /// @brief Chain link that represents a single sequential link.
  template <typename Handler>
  chain then(const Handler& handler)
  {
    boost::array<Handler, 1> handlers = {{handler}};
    return all(handlers);
  }

  /// @brief Wrap a handler, returning a chained_handler.
  template <typename Handler>
  detail::chained_handler<Handler> wrap(const Handler& handler)
  {
    return impl_->wrap(handler);
  }

  /// @brief Set the executor.
  void executor(boost::asio::io_service& io_service)
  {
    impl_->executor()->io_service(&io_service);
  }

  /// @brief Check if this chain should inherit its executor.
  bool inherits_executor()
  {
    return !impl_->executor()->io_service();
  }

private:

  /// @brief Private constructor used to create links in the chain.
  ///
  /// @note All links maintain a handle to the root impl.  When constructing a
  ///       chain, this allows for links later in the chain to be stored as
  ///       non-temporaries.
  chain(boost::shared_ptr<detail::chain_impl> root_impl,
        std::size_t required)
    : impl_(boost::make_shared<detail::chain_impl>(
              root_impl->executor(), required)),
      root_impl_(root_impl)
  {}

  /// @brief Create a new chain link, wrapping handlers and posting into
  ///        the current chain.
  template <typename Container>
  chain post(const Container& container,
             std::size_t required)
  {
    // Create next chain.
    chain next(root_impl_, required);

    // Wrap handlers from the next chain, and post into the current chain.
    std::for_each(container.begin(), container.end(),
                  detail::wrap_and_post(impl_, next.impl_));

    return next;
  }

private:
  boost::shared_ptr<detail::chain_impl> impl_;
  boost::shared_ptr<detail::chain_impl> root_impl_;
};

void detail::wrap_and_post::operator()(chain c)
{
  // If next does not have an executor, then inherit from current.
  if (c.inherits_executor())
      c.executor(*current_->executor()->io_service());

  // When current completes, start the chain.
  current_->post(boost::protect(boost::bind(&chain::run, c)));

  // The next impl needs to be aware of when the chain stops, so
  // wrap a noop and append it to the end of the chain.
  c.then(next_->wrap(&detail::noop));  
}

// Convenience functions.
template <typename T, typename Handler>
chain async(T& t, const Handler& handler)
{
  return chain(t).then(handler);
}

template <typename T,
          typename Container>
chain when_all(T& t, const Container& container)
{
  return chain(t).all(container);
}

template <typename T,
          typename Iterator>
chain when_all(T& t, Iterator begin, Iterator end)
{
  return chain(t).all(begin, end);
}

template <typename T,
          typename Container>
chain when_any(T& t, const Container& container)
{
  return chain(t).any(container);
}

template <typename T,
          typename Iterator>
chain when_any(T& t, Iterator begin, Iterator end)
{
  return chain(t).any(begin, end);
}

以下是使用上述代码和两个线程的一些基本示例。我的符号:

  • a -> ba然后表示b
  • (a | b)表示ab。因此(a | b) -> c暗示当或者a或者b完成时,然后运行c
  • (a & b)表示ab。因此(a & b) -> c意味着当两者都a完成时b,然后运行c

在每种情况之前,我都会打印链的符号。此外,每个函数在进入时会打印一个大写字母,在退出时会打印一个小写字母。

#include <iostream>
#include <boost/asio.hpp>
#include <boost/assign.hpp>
#include <boost/thread.hpp>
#include "async_ops.hpp"

/// @brief Print identifiers when entering and exiting scope,
///        sleeping between.
void print_and_sleep(char id, unsigned int sleep_time)
{
  std::cout << char(toupper(id));
  boost::this_thread::sleep_for(boost::chrono::milliseconds(sleep_time));
  std::cout << char(tolower(id));
  std::cout.flush();
}

/// @brief Convenience function to create functors.
boost::function<void()> make_fn(char id, unsigned int sleep_time)
{
  return boost::bind(&print_and_sleep, id, sleep_time);  
}

/// @brief Run an io_service with multiple threads.
void run_service(boost::asio::io_service& io_service)
{
  boost::thread_group threads;
  threads.create_thread(boost::bind(
    &boost::asio::io_service::run, &io_service));
  io_service.run();
  threads.join_all();
}

int main()
{
  boost::function<void()> a = make_fn('a', 500);
  boost::function<void()> b = make_fn('b', 1000);
  boost::function<void()> c = make_fn('c', 500);
  boost::function<void()> d = make_fn('d', 1000);
  boost::function<void()> e = make_fn('e', 500);

  {
    std::cout << "a -> b -> c\n"
                 "  ";
    boost::asio::io_service io_service;
    async(io_service, a)
      .then(b)
      .then(c);
    run_service(io_service);
    std::cout << std::endl;
  }

  {
    std::cout << "(a & b) -> c\n"
                 "  ";
    boost::asio::io_service io_service;
    when_all(io_service, boost::assign::list_of(a)(b))
      .then(c); 
    run_service(io_service);
    std::cout << std::endl;
  }

  {
    std::cout << "(a | b) -> c\n"
                 "  ";
    boost::asio::io_service io_service;
    when_any(io_service, boost::assign::list_of(a)(b))
      .then(c); 
    run_service(io_service);
    std::cout << std::endl;
  }

  {
    std::cout << "(a & b) -> (c & d)\n"
                 "  ";
    boost::asio::io_service io_service;
    when_all(io_service, boost::assign::list_of(a)(b))
      .all(boost::assign::list_of(c)(d));
    run_service(io_service);
    std::cout << std::endl;
  }

  {
    std::cout << "(a & b) -> c -> (d & e)\n"
                 "  ";
    boost::asio::io_service io_service;
    when_all(io_service, boost::assign::list_of(a)(b))
      .then(c)
      .all(boost::assign::list_of(d)(e));
    run_service(io_service);
    std::cout << std::endl;
  }

  std::cout << "(a & b) -> (c & d) -> e\n"
               "  ";
  {
    boost::asio::io_service io_service;
    when_all(io_service, boost::assign::list_of(a)(b))
      .all(boost::assign::list_of(c)(d))
      .then(e);
    run_service(io_service);
    std::cout << std::endl;
  }

  std::cout << "(a | b) -> (c | d) -> e\n"
               "  ";
  {
    boost::asio::io_service io_service;
    when_any(io_service, boost::assign::list_of(a)(b))
      .any(boost::assign::list_of(c)(d))
      .then(e);
    run_service(io_service);
    std::cout << std::endl;
  }

  std::cout << "(a | b) -> (c & d) -> e\n"
               "  ";
  {
    boost::asio::io_service io_service;
    when_any(io_service, boost::assign::list_of(a)(b))
      .all(boost::assign::list_of(c)(d))
      .then(e);
    run_service(io_service);
    std::cout << std::endl;
  }

  {
    std::cout << "a -> ((b -> d) | c) -> e\n"
                 "  ";
    boost::asio::io_service io_service;
    async(io_service, a)
      .any(boost::assign::list_of
           (async(io_service, b).then(d))
           (async(inherit_service, c)))
      .then(e);
    run_service(io_service);
    std::cout << std::endl;
  }
}

产生以下输出:

a -> b -> c
  AaBbCc
(a & b) -> c
  ABabCc
(a | b) -> c
  ABaCbc
(a & b) -> (c & d)
  ABabCDcd
(a & b) -> c -> (d & e)
  ABabCcDEed
(a & b) -> (c & d) -> e
  ABabCDcdEe
(a | b) -> (c | d) -> e
  ABaCbDcEed
(a | b) -> (c & d) -> e
  ABaCbDcdEe
a -> ((b -> d) | c) -> e
  AaBCcEbDed
于 2013-08-21T17:45:56.260 回答
-1

如何使用工作概念?只要工作可用, io_service::run 就会运行,一旦没有未完成的任务,就会在工作被删除时终止。

在调用 run 之前,您创建一个工作实例:

boost::shared_ptr<boost::asio::io_service::work> work( new boost::asio::io_service::work( io_service) );

并且在您希望允许 io_servce::run 终止时立即调用您的另一个线程。

work.reset();

另见http://www.boost.org/doc/libs/1_54_0/doc/html/boost_asio/reference/io_service.html#boost_asio.reference.io_service.stopping_the_io_service_from_running_out_of_work

于 2013-08-20T12:52:14.443 回答