2

我想同时使用 Boost.Asio 的链和优先包装器。

在编写代码之前,我已经阅读了以下信息:

提高 asio 优先级和链

boost::asio 和活动对象

http://thread.gmane.org/gmane.comp.lib.boost.asio.user/3531

为什么在使用 boost::asio 时每个连接都需要 strand?

我想使用包装器方法,因为我想使用各种异步 API,例如 async_read、async_write 和 async_connect。根据http://thread.gmane.org/gmane.comp.lib.boost.asio.user/3531,似乎可以组合优先级包装器和链包装器。

所以我根据下面的例子写了代码:

http://www.boost.org/doc/libs/1_63_0/doc/html/boost_asio/example/cpp03/invocation/prioritised_handlers.cpp

这是我的代码:

#include <iostream>
#include <functional>
#include <queue>
#include <vector>
#include <thread>
#include <mutex>

#include <boost/asio.hpp>
#include <boost/optional.hpp>

#define ENABLE_STRAND 1
#define ENABLE_PRIORITY 1

class handler_priority_queue {
public:
    template <typename Handler>
    void add(int priority, Handler&& handler) {
        std::cout << "add(" << priority << ")" << std::endl;
        std::lock_guard<std::mutex> g(mtx_);
        handlers_.emplace(priority, std::forward<Handler>(handler));
    }

    void execute_all() {
        auto top = [&]() -> boost::optional<queued_handler> {
            std::lock_guard<std::mutex> g(mtx_);
            if (handlers_.empty()) return boost::none;
            boost::optional<queued_handler> opt = handlers_.top();
            handlers_.pop();
            return opt;
        };
        while (auto h_opt = top()) {
            h_opt.get().execute();
        }
    }

    template <typename Handler>
    class wrapped_handler {
    public:
        wrapped_handler(handler_priority_queue& q, int p, Handler h)
            : queue_(q), priority_(p), handler_(std::move(h))
        {
        }

        template <typename... Args>
        void operator()(Args&&... args) {
            std::cout << "operator() " << std::endl;
            handler_(std::forward<Args>(args)...);
        }

        //private:
        handler_priority_queue& queue_;
        int priority_;
        Handler handler_;
    };

    template <typename Handler>
    wrapped_handler<Handler> wrap(int priority, Handler&& handler) {
        return wrapped_handler<Handler>(*this, priority, std::forward<Handler>(handler));
    }

private:
    class queued_handler {
    public:
        template <typename Handler>
        queued_handler(int p, Handler&& handler)
            : priority_(p), function_(std::forward<Handler>(handler))
        {
            std::cout << "queued_handler()" << std::endl;
        }

        void execute() {
            std::cout << "execute(" << priority_ << ")" << std::endl;
            function_();
        }

        friend bool operator<(
            queued_handler const& lhs,
            queued_handler const & rhs) {
            return lhs.priority_ < rhs.priority_;
        }

    private:
        int priority_;
        std::function<void()> function_;
    };

    std::priority_queue<queued_handler> handlers_;
    std::mutex mtx_;
};

// Custom invocation hook for wrapped handlers.
template <typename Function, typename Handler>
void asio_handler_invoke(Function&& f,
                         handler_priority_queue::wrapped_handler<Handler>* h) {
    std::cout << "asio_handler_invoke " << std::endl;
    h->queue_.add(h->priority_, std::forward<Function>(f));
}

//----------------------------------------------------------------------

int main() {
    int const num_of_threads = 4;
    int const num_of_tasks = 5;

    boost::asio::io_service ios;
    boost::asio::strand strand(ios);


    handler_priority_queue pq;

    for (int i = 0; i != num_of_tasks; ++i) {
        ios.post(
#if ENABLE_STRAND
            strand.wrap(
#endif
#if ENABLE_PRIORITY
                pq.wrap(
                    i,
#endif
                    [=] {
                        std::cout << "[called] " << i << "," << std::this_thread::get_id() << std::endl;
                    }
#if ENABLE_PRIORITY
                )
#endif
#if ENABLE_STRAND
            )
#endif
        );
    }

    std::vector<std::thread> pool;
    for (int i = 0; i != num_of_threads; ++i) {
        pool.emplace_back([&]{
                std::cout << "before run_one()" << std::endl;
                while (ios.run_one()) {
                    std::cout << "before poll_one()" << std::endl;
                    while (ios.poll_one())
                        ;
                    std::cout << "before execute_all()" << std::endl;
                    pq.execute_all();
                }
            }
        );
    }
    for (auto& t : pool) t.join();
}

包装器由以下宏启用:

#define ENABLE_STRAND 1
#define ENABLE_PRIORITY 1

启用两个宏后,我得到以下结果:

before run_one()
asio_handler_invoke
add(0)
queued_handler()
before poll_one()
asio_handler_invoke
add(1)
queued_handler()
asio_handler_invoke
add(2)
queued_handler()
asio_handler_invoke
add(3)
queued_handler()
asio_handler_invoke
add(4)
queued_handler()
before execute_all()
execute(4)
execute(3)
execute(2)
execute(1)
execute(0)
before run_one()
before run_one()
before run_one()

我希望我得到

[called] priority,thread_id

输出为

[called] 1,140512649541376

但我没明白。

似乎在函数execute()中,function_()被调用但wrapped_handler::operator()未被调用。(在我的代码execute()中调用了该函数。)pq.execute_all();

void execute() {
    std::cout << "execute(" << priority_ << ")" << std::endl;
    function_(); // It is called.
}

template <typename Handler>
class wrapped_handler {
public:

    template <typename... Args>
    void operator()(Args&&... args) { // It is NOT called
        std::cout << "operator() " << std::endl;
        handler_(std::forward<Args>(args)...);
    }

function_()我在调用之后跟踪了序列。

调用以下函数:

https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/detail/wrapped_handler.hpp#L191 https://github.com/boostorg/asio/blob/boost-1.63 .0/include/boost/asio/detail/wrapped_handler.hpp#L76 https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/strand.hpp#L158 https:// /github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/detail/impl/strand_service.hpp#L55 https://github.com/boostorg/asio/blob/boost-1.63。 0/include/boost/asio/detail/impl/strand_service.ipp#L94

然后在函数bool strand_service::do_dispatch(implementation_type& impl, operation* op)中,操作op没有被调用,而是被推入队列中,如下一行:

https://github.com/boostorg/asio/blob/boost-1.63.0/include/boost/asio/detail/impl/strand_service.ipp#L111

我不确定为什么将function_()其发送到 strand_service。我认为在我的代码中的以下点已经解开了绞线包装器:

template <typename Function, typename Handler>
void asio_handler_invoke(Function&& f,
                         handler_priority_queue::wrapped_handler<Handler>* h) {
    std::cout << "asio_handler_invoke " << std::endl;
    h->queue_.add(h->priority_, std::forward<Function>(f));
}

如果我只启用优先级包装器,我会得到以下结果。似乎按我的预期工作。

before run_one()
asio_handler_invoke
add(0)
queued_handler()
before poll_one()
asio_handler_invoke
add(1)
queued_handler()
asio_handler_invoke
add(2)
queued_handler()
asio_handler_invoke
add(3)
queued_handler()
asio_handler_invoke
add(4)
queued_handler()
before execute_all()
execute(4)
operator()
[called] 4,140512649541376
execute(3)
operator()
[called] 3,140512649541376
execute(2)
operator()
[called] 2,140512649541376
execute(1)
operator()
[called] 1,140512649541376
execute(0)
operator()
[called] 0,140512649541376
before run_one()
before run_one()
before run_one()

如果我只启用了 strand wrapper,我会得到以下结果。似乎也像我预期的那样工作。

before run_one()
[called] 0,140127385941760
before poll_one()
[called] 1,140127385941760
[called] 2,140127385941760
[called] 3,140127385941760
[called] 4,140127385941760
before execute_all()
before run_one()
before run_one()
before run_one()

有任何想法吗?

4

1 回答 1

2

我解决了这个问题。

我不确定为什么将 function_() 分派给 strand_service。我认为在我的代码中的以下点已经解开了绞线包装器:

template <typename Function, typename Handler>
void asio_handler_invoke(Function&& f,
                         handler_priority_queue::wrapped_handler<Handler>* h) {
    std::cout << "asio_handler_invoke " << std::endl;
    h->queue_.add(h->priority_, std::forward<Function>(f));
}

参数f是原始处理程序。这意味着优先队列包装和链包装处理程序。绕线器在外面。所以在调用时f,它被分派给 strand_service。此过程发生在同一个 strand_service 中,因此不会调用处理程序。

要解决此问题,请添加h->handler_到优先队列中,而不是f如下:

// Custom invocation hook for wrapped handlers.
template <typename Function, typename Handler>
void asio_handler_invoke(Function&& f,
                         handler_priority_queue::wrapped_handler<Handler>* h) {
    std::cout << "asio_handler_invoke " << std::endl;
    h->queue_.add(h->priority_, h->handler_);
}

handler_是类模板的成员变量wrapped_handler。它包含未包装的处理程序。

这是完整的代码:

#include <iostream>
#include <functional>
#include <queue>
#include <vector>
#include <thread>
#include <mutex>

#include <boost/asio.hpp>
#include <boost/optional.hpp>

#define ENABLE_STRAND 1
#define ENABLE_PRIORITY 1

class handler_priority_queue {
public:
    template <typename Handler>
    void add(int priority, Handler&& handler) {
        std::cout << "add(" << priority << ")" << std::endl;
        std::lock_guard<std::mutex> g(mtx_);
        handlers_.emplace(priority, std::forward<Handler>(handler));
    }

    void execute_all() {
        auto top = [&]() -> boost::optional<queued_handler> {
            std::lock_guard<std::mutex> g(mtx_);
            if (handlers_.empty()) return boost::none;
            boost::optional<queued_handler> opt = handlers_.top();
            handlers_.pop();
            return opt;
        };
        while (auto h_opt = top()) {
            h_opt.get().execute();
        }
    }

    template <typename Handler>
    class wrapped_handler {
    public:
        template <typename HandlerArg>
        wrapped_handler(handler_priority_queue& q, int p, HandlerArg&& h)
            : queue_(q), priority_(p), handler_(std::forward<HandlerArg>(h))
        {
        }

        template <typename... Args>
        void operator()(Args&&... args) {
            std::cout << "operator() " << std::endl;
            handler_(std::forward<Args>(args)...);
        }

        //private:
        handler_priority_queue& queue_;
        int priority_;
        Handler handler_;
    };

    template <typename Handler>
    wrapped_handler<Handler> wrap(int priority, Handler&& handler) {
        return wrapped_handler<Handler>(*this, priority, std::forward<Handler>(handler));
    }

private:
    class queued_handler {
    public:
        template <typename Handler>
        queued_handler(int p, Handler&& handler)
            : priority_(p), function_(std::forward<Handler>(handler))
        {
            std::cout << "queued_handler()" << std::endl;
        }

        void execute() {
            std::cout << "execute(" << priority_ << ")" << std::endl;
            function_();
        }

        friend bool operator<(
            queued_handler const& lhs,
            queued_handler const & rhs) {
            return lhs.priority_ < rhs.priority_;
        }

    private:
        int priority_;
        std::function<void()> function_;
    };

    std::priority_queue<queued_handler> handlers_;
    std::mutex mtx_;
};

// Custom invocation hook for wrapped handlers.
template <typename Function, typename Handler>
void asio_handler_invoke(Function&& f,
                         handler_priority_queue::wrapped_handler<Handler>* h) {
    std::cout << "asio_handler_invoke " << std::endl;
    h->queue_.add(h->priority_, h->handler_);
}

//----------------------------------------------------------------------

int main() {
    int const num_of_threads = 4;
    int const num_of_tasks = 5;

    boost::asio::io_service ios;
    boost::asio::strand strand(ios);


    handler_priority_queue pq;

    for (int i = 0; i != num_of_tasks; ++i) {
        ios.post(
#if ENABLE_STRAND
            strand.wrap(
#endif
#if ENABLE_PRIORITY
                pq.wrap(
                    i,
#endif
                    [=] {
                        std::cout << "[called] " << i << "," << std::this_thread::get_id() << std::endl;
                    }
#if ENABLE_STRAND
                )
#endif
#if ENABLE_PRIORITY
            )
#endif
        );
    }

    std::vector<std::thread> pool;
    for (int i = 0; i != num_of_threads; ++i) {
        pool.emplace_back([&]{
                std::cout << "before run_one()" << std::endl;
                while (ios.run_one()) {
                    std::cout << "before poll_one()" << std::endl;
                    while (ios.poll_one())
                        ;
                    std::cout << "before execute_all()" << std::endl;
                    pq.execute_all();
                }
            }
        );
    }
    for (auto& t : pool) t.join();
}

这是一个输出:

before run_one()
asio_handler_invoke
add(0)
queued_handler()
before poll_one()
asio_handler_invoke
add(1)
queued_handler()
asio_handler_invoke
add(2)
queued_handler()
asio_handler_invoke
add(3)
queued_handler()
asio_handler_invoke
add(4)
queued_handler()
before execute_all()
execute(4)
[called] 4,139903315736320
execute(3)
[called] 3,139903315736320
execute(2)
[called] 2,139903315736320
execute(1)
[called] 1,139903315736320
execute(0)
[called] 0,139903315736320
before run_one()
before run_one()
before run_one()
于 2017-04-19T03:18:13.490 回答