4

主题

这个 POC 应用程序使用 Boost ASIO 和协程来服务 HTTP 请求。一旦请求被完全读取,连接处理程序会将实际的请求处理外包到一个单独的线程池中,用于 CPU 绑定的操作,并且连接处理程序协程会暂停。一旦响应完成,该协程就会恢复发送它。

问题

在使用应用程序进行测试时,wrk -d 30 -c 100 -t 100 http://127.0.0.1:8910/迟早会失败:

grumpycathttpd: /usr/include/boost/coroutine/detail/push_coroutine_impl.hpp:258: void boost::coroutines::detail::push_coroutine_impl<void>::push(): Assertion `! is_running()' failed.

我的问题

  1. 应用程序的概念(带有协程的 ASIO 线程池 + 带有队列的 CPU 绑定线程池)是否可能/现实?
  2. 如果是:在暂停/恢复协程时我到底做错了什么?我不应该使用async_completion,completion_handler并且asio_handler_invoke像 boost 库本身一样吗?

代码

AFAIK 我应该发布应用程序的代码——所以在这里......我希望有 223 行并不重要。

IMO 的重要线路是 160、193-198、92-102 和 106-109。

/* 1 */ #include <condition_variable>
/* 2 */ using std::condition_variable;
/* 3 */ 
/* 4 */ #include <exception>
/* 5 */ using std::exception;
/* 6 */ 
/* 7 */ #include <functional>
/* 8 */ using std::function;
/* 9 */ 
/* 10 */ #include <iostream>
/* 11 */ using std::cout;
/* 12 */ using std::endl;
/* 13 */ 
/* 14 */ #include <limits>
/* 15 */ using std::numeric_limits;
/* 16 */ 
/* 17 */ #include <memory>
/* 18 */ using std::make_shared;
/* 19 */ using std::shared_ptr;
/* 20 */ 
/* 21 */ #include <mutex>
/* 22 */ using std::mutex;
/* 23 */ using std::unique_lock;
/* 24 */ 
/* 25 */ #include <queue>
/* 26 */ using std::queue;
/* 27 */ 
/* 28 */ #include <thread>
/* 29 */ using std::thread;
/* 30 */ 
/* 31 */ #include <utility>
/* 32 */ using std::move;
/* 33 */ 
/* 34 */ #include <vector>
/* 35 */ using std::vector;
/* 36 */ 
/* 37 */ #include <boost/asio/async_result.hpp>
/* 38 */ using boost::asio::async_completion;
/* 39 */ 
/* 40 */ #include <boost/asio/buffer.hpp>
/* 41 */ using boost::asio::mutable_buffer;
/* 42 */ 
/* 43 */ #include <boost/asio/buffered_stream.hpp>
/* 44 */ using boost::asio::buffered_stream;
/* 45 */ 
/* 46 */ #include <boost/asio/handler_invoke_hook.hpp>
/* 47 */ using boost::asio::asio_handler_invoke;
/* 48 */ 
/* 49 */ #include <boost/asio/io_service.hpp>
/* 50 */ using boost::asio::io_service;
/* 51 */ 
/* 52 */ #include <boost/asio/ip/tcp.hpp>
/* 53 */ using boost::asio::ip::tcp;
/* 54 */ 
/* 55 */ #include <boost/asio/spawn.hpp>
/* 56 */ using boost::asio::spawn;
/* 57 */ 
/* 58 */ #include <boost/asio/yield.hpp>
/* 59 */ using boost::asio::yield_context;
/* 60 */ 
/* 61 */ #include <boost/beast/core.hpp>
/* 62 */ namespace beast = boost::beast;
/* 63 */ 
/* 64 */ #include <boost/beast/http.hpp>
/* 65 */ namespace http = beast::http;
/* 66 */ 
/* 67 */ #include <boost/system/error_code.hpp>
/* 68 */ using boost::system::error_code;
/* 69 */ 
/* 70 */ 
/* 71 */ class work_queue {
/* 72 */ public:
/* 73 */    work_queue(io_service& io) : io(io), pool((vector<thread>::size_type)(thread::hardware_concurrency())), stop(false) {
/* 74 */        for (auto& thrd : pool) {
/* 75 */            thrd = thread([this](){ run(); });
/* 76 */        }
/* 77 */    }
/* 78 */ 
/* 79 */    ~work_queue() {
/* 80 */        {
/* 81 */            unique_lock<mutex> ul (mtx);
/* 82 */            stop = true;
/* 83 */            cond_var.notify_all();
/* 84 */        }
/* 85 */ 
/* 86 */        for (auto& thrd : pool) {
/* 87 */            thrd.join();
/* 88 */        }
/* 89 */    }
/* 90 */ 
/* 91 */    template<class Handler>
/* 92 */    void async_run(function<void()> task, Handler&& handler) {
/* 93 */        async_completion<Handler, void(error_code)> init(handler);
/* 94 */ 
/* 95 */        {
/* 96 */            auto completion_handler (make_shared<decltype(init.completion_handler)>(init.completion_handler));
/* 97 */            unique_lock<mutex> ul (mtx);
/* 98 */            tasks.emplace(enqueued_task({move(task), [completion_handler](){ asio_handler_invoke(*completion_handler); }}));
/* 99 */            cond_var.notify_all();
/* 100 */       }
/* 101 */ 
/* 102 */       init.result.get();
/* 103 */   }
/* 104 */ 
/* 105 */ private:
/* 106 */   struct enqueued_task {
/* 107 */       function<void()> task;
/* 108 */       function<void()> on_done;
/* 109 */   };
/* 110 */ 
/* 111 */   mutex mtx;
/* 112 */   condition_variable cond_var;
/* 113 */   io_service& io;
/* 114 */   queue<enqueued_task> tasks;
/* 115 */   vector<thread> pool;
/* 116 */   bool stop;
/* 117 */ 
/* 118 */   void run() {
/* 119 */       unique_lock<mutex> ul (mtx);
/* 120 */ 
/* 121 */       while (!stop) {
/* 122 */           while (!tasks.empty()) {
/* 123 */               auto task (move(tasks.front()));
/* 124 */               tasks.pop();
/* 125 */ 
/* 126 */               ul.unlock();
/* 127 */ 
/* 128 */               try {
/* 129 */                   task.task();
/* 130 */               } catch (...) {
/* 131 */               }
/* 132 */ 
/* 133 */               io.post(move(task.on_done));
/* 134 */ 
/* 135 */               ul.lock();
/* 136 */           }
/* 137 */ 
/* 138 */           cond_var.wait(ul);
/* 139 */       }
/* 140 */   }
/* 141 */ };
/* 142 */ 
/* 143 */ int main() {
/* 144 */   vector<thread> pool ((vector<thread>::size_type)(thread::hardware_concurrency()));
/* 145 */   io_service io;
/* 146 */   work_queue wq (io);
/* 147 */   tcp::acceptor acceptor (io);
/* 148 */   tcp::endpoint endpoint (tcp::v6(), 8910);
/* 149 */ 
/* 150 */   acceptor.open(endpoint.protocol());
/* 151 */   acceptor.set_option(tcp::acceptor::reuse_address(true));
/* 152 */   acceptor.bind(endpoint);
/* 153 */   acceptor.listen(numeric_limits<int>::max());
/* 154 */ 
/* 155 */   spawn(acceptor.get_io_context(), [&acceptor, &wq](yield_context yc) {
/* 156 */       for (;;) {
/* 157 */           shared_ptr<tcp::socket> peer (new tcp::socket(acceptor.get_io_context()));
/* 158 */           acceptor.async_accept(*peer, yc);
/* 159 */ 
/* 160 */           spawn(acceptor.get_io_context(), [peer, &wq](yield_context yc) {
/* 161 */               try {
/* 162 */                   {
/* 163 */                       auto remote (peer->remote_endpoint());
/* 164 */                       cout << "I has conn: [" << remote.address().to_string() << "]:" << remote.port() << endl;
/* 165 */                   }
/* 166 */ 
/* 167 */                   buffered_stream<decltype(*peer)> iobuf (*peer);
/* 168 */ 
/* 169 */                   iobuf.async_fill(yc);
/* 170 */ 
/* 171 */                   if (iobuf.in_avail() > 0) {
/* 172 */                       char first_char;
/* 173 */ 
/* 174 */                       {
/* 175 */                           mutable_buffer first_char_buf (&first_char, 1);
/* 176 */                           iobuf.peek(first_char_buf);
/* 177 */                       }
/* 178 */ 
/* 179 */                       if ('0' <= first_char && first_char <= '9') {
/* 180 */                           cout << "I has JSON-RPC!" << endl;
/* 181 */                       } else {
/* 182 */                           beast::flat_buffer buf;
/* 183 */ 
/* 184 */                           for (;;) {
/* 185 */                               http::request<http::string_body> req;
/* 186 */ 
/* 187 */                               http::async_read(iobuf, buf, req, yc);
/* 188 */ 
/* 189 */                               cout << "I has req: '" << req.body() << '\'' << endl;
/* 190 */ 
/* 191 */                               http::response<http::string_body> res;
/* 192 */ 
/* 193 */                               wq.async_run([&req, &res](){
/* 194 */                                   res.result(http::status::internal_server_error);
/* 195 */                                   res.set(http::field::content_type, "text/plain");
/* 196 */                                   res.set(http::field::content_length, "36");
/* 197 */                                   res.body() = "I like onions. They make people cry.";
/* 198 */                               }, yc);
/* 199 */ 
/* 200 */                               http::async_write(iobuf, res, yc);
/* 201 */                               iobuf.async_flush(yc);
/* 202 */ 
/* 203 */                               cout << "I has res." << endl;
/* 204 */                           }
/* 205 */                       }
/* 206 */                   }
/* 207 */ 
/* 208 */                   peer->shutdown(peer->shutdown_both);
/* 209 */               } catch (const exception& e) {
/* 210 */                   cout << "I has exception: " << e.what() << endl;
/* 211 */               }
/* 212 */           });
/* 213 */       }
/* 214 */   });
/* 215 */ 
/* 216 */   for (auto& thrd : pool) {
/* 217 */       thrd = thread([&io](){ io.run(); });
/* 218 */   }
/* 219 */ 
/* 220 */   for (auto& thrd : pool) {
/* 221 */       thrd.join();
/* 222 */   }
/* 223 */ }

编辑#1:回溯

...
Assertion failed: (! is_running()), function push, file CENSORED/include/boost/coroutine/detail/push_coroutine_impl.hpp, line 258.
Process 43694 stopped
* thread #2, stop reason = signal SIGABRT
    frame #0: 0x00007fff7567123e libsystem_kernel.dylib`__pthread_kill + 10
libsystem_kernel.dylib`__pthread_kill:
->  0x7fff7567123e <+10>: jae    0x7fff75671248            ; <+20>
    0x7fff75671240 <+12>: movq   %rax, %rdi
    0x7fff75671243 <+15>: jmp    0x7fff7566b3b7            ; cerror_nocancel
    0x7fff75671248 <+20>: retq   
Target 0: (grumpycathttpd) stopped.
(lldb) bt all
  thread #1, queue = 'com.apple.main-thread'
    frame #0: 0x00007fff7566d94e libsystem_kernel.dylib`__ulock_wait + 10
    frame #1: 0x00007fff7572970a libsystem_pthread.dylib`_pthread_join + 356
    frame #2: 0x00007fff72c48978 libc++.1.dylib`std::__1::thread::join() + 24
    frame #3: 0x00000001000039b8 grumpycathttpd`main + 1544
    frame #4: 0x00007fff75531ed9 libdyld.dylib`start + 1
    frame #5: 0x00007fff75531ed9 libdyld.dylib`start + 1
* thread #2, stop reason = signal SIGABRT
  * frame #0: 0x00007fff7567123e libsystem_kernel.dylib`__pthread_kill + 10
    frame #1: 0x00007fff75727c1c libsystem_pthread.dylib`pthread_kill + 285
    frame #2: 0x00007fff755da1c9 libsystem_c.dylib`abort + 127
    frame #3: 0x00007fff755a2868 libsystem_c.dylib`__assert_rtn + 320
    frame #4: 0x00000001000680ea grumpycathttpd`boost::coroutines::detail::push_coroutine_impl<void>::push() + 122
    frame #5: 0x000000010000884c grumpycathttpd`boost::asio::detail::completion_handler<std::__1::function<void ()> >::do_complete(void*, boost::asio::detail::scheduler_operation*, boost::system::error_code const&, unsigned long) + 156
    frame #6: 0x00000001000691e6 grumpycathttpd`boost::asio::detail::scheduler::do_run_one(boost::asio::detail::conditionally_enabled_mutex::scoped_lock&, boost::asio::detail::scheduler_thread_info&, boost::system::error_code const&) + 822
    frame #7: 0x0000000100068cb2 grumpycathttpd`boost::asio::detail::scheduler::run(boost::system::error_code&) + 194
    frame #8: 0x0000000100068aea grumpycathttpd`void* std::__1::__thread_proxy<std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, main::$_1> >(void*) + 74
    frame #9: 0x00007fff75725305 libsystem_pthread.dylib`_pthread_body + 126
    frame #10: 0x00007fff7572826f libsystem_pthread.dylib`_pthread_start + 70
    frame #11: 0x00007fff75724415 libsystem_pthread.dylib`thread_start + 13
  thread #3
    frame #0: 0x00007fff7566e7de libsystem_kernel.dylib`__psynch_cvwait + 10
    frame #1: 0x00007fff75728593 libsystem_pthread.dylib`_pthread_cond_wait + 724
    frame #2: 0x00007fff72c07bda libc++.1.dylib`std::__1::condition_variable::wait(std::__1::unique_lock<std::__1::mutex>&) + 18
    frame #3: 0x00000001000081f4 grumpycathttpd`work_queue::run() + 644
    frame #4: 0x0000000100007efd grumpycathttpd`void* std::__1::__thread_proxy<std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, work_queue::work_queue(boost::asio::io_context&)::'lambda'()> >(void*) + 45
    frame #5: 0x00007fff75725305 libsystem_pthread.dylib`_pthread_body + 126
    frame #6: 0x00007fff7572826f libsystem_pthread.dylib`_pthread_start + 70
    frame #7: 0x00007fff75724415 libsystem_pthread.dylib`thread_start + 13
  thread #4
    frame #0: 0x00007fff7566e7de libsystem_kernel.dylib`__psynch_cvwait + 10
    frame #1: 0x00007fff75728593 libsystem_pthread.dylib`_pthread_cond_wait + 724
    frame #2: 0x00007fff72c07bda libc++.1.dylib`std::__1::condition_variable::wait(std::__1::unique_lock<std::__1::mutex>&) + 18
    frame #3: 0x00000001000081f4 grumpycathttpd`work_queue::run() + 644
    frame #4: 0x0000000100007efd grumpycathttpd`void* std::__1::__thread_proxy<std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, work_queue::work_queue(boost::asio::io_context&)::'lambda'()> >(void*) + 45
    frame #5: 0x00007fff75725305 libsystem_pthread.dylib`_pthread_body + 126
    frame #6: 0x00007fff7572826f libsystem_pthread.dylib`_pthread_start + 70
    frame #7: 0x00007fff75724415 libsystem_pthread.dylib`thread_start + 13
  thread #5
    frame #0: 0x00007fff7566e7de libsystem_kernel.dylib`__psynch_cvwait + 10
    frame #1: 0x00007fff75728593 libsystem_pthread.dylib`_pthread_cond_wait + 724
    frame #2: 0x00007fff72c07bda libc++.1.dylib`std::__1::condition_variable::wait(std::__1::unique_lock<std::__1::mutex>&) + 18
    frame #3: 0x00000001000081f4 grumpycathttpd`work_queue::run() + 644
    frame #4: 0x0000000100007efd grumpycathttpd`void* std::__1::__thread_proxy<std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, work_queue::work_queue(boost::asio::io_context&)::'lambda'()> >(void*) + 45
    frame #5: 0x00007fff75725305 libsystem_pthread.dylib`_pthread_body + 126
    frame #6: 0x00007fff7572826f libsystem_pthread.dylib`_pthread_start + 70
    frame #7: 0x00007fff75724415 libsystem_pthread.dylib`thread_start + 13
  thread #6
    frame #0: 0x00007fff7566e7de libsystem_kernel.dylib`__psynch_cvwait + 10
    frame #1: 0x00007fff75728593 libsystem_pthread.dylib`_pthread_cond_wait + 724
    frame #2: 0x00007fff72c07bda libc++.1.dylib`std::__1::condition_variable::wait(std::__1::unique_lock<std::__1::mutex>&) + 18
    frame #3: 0x00000001000081f4 grumpycathttpd`work_queue::run() + 644
    frame #4: 0x0000000100007efd grumpycathttpd`void* std::__1::__thread_proxy<std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, work_queue::work_queue(boost::asio::io_context&)::'lambda'()> >(void*) + 45
    frame #5: 0x00007fff75725305 libsystem_pthread.dylib`_pthread_body + 126
    frame #6: 0x00007fff7572826f libsystem_pthread.dylib`_pthread_start + 70
    frame #7: 0x00007fff75724415 libsystem_pthread.dylib`thread_start + 13
  thread #7
    frame #0: 0x00007fff7566de76 libsystem_kernel.dylib`__psynch_mutexwait + 10
    frame #1: 0x00007fff75725d6c libsystem_pthread.dylib`_pthread_mutex_firstfit_lock_wait + 96
    frame #2: 0x00007fff757234b7 libsystem_pthread.dylib`_pthread_mutex_firstfit_lock_slow + 226
    frame #3: 0x00000001000696fb grumpycathttpd`boost::asio::detail::kqueue_reactor::run(long, boost::asio::detail::op_queue<boost::asio::detail::scheduler_operation>&) + 379
    frame #4: 0x0000000100069070 grumpycathttpd`boost::asio::detail::scheduler::do_run_one(boost::asio::detail::conditionally_enabled_mutex::scoped_lock&, boost::asio::detail::scheduler_thread_info&, boost::system::error_code const&) + 448
    frame #5: 0x0000000100068cb2 grumpycathttpd`boost::asio::detail::scheduler::run(boost::system::error_code&) + 194
    frame #6: 0x0000000100068aea grumpycathttpd`void* std::__1::__thread_proxy<std::__1::tuple<std::__1::unique_ptr<std::__1::__thread_struct, std::__1::default_delete<std::__1::__thread_struct> >, main::$_1> >(void*) + 74
    frame #7: 0x00007fff75725305 libsystem_pthread.dylib`_pthread_body + 126
    frame #8: 0x00007fff7572826f libsystem_pthread.dylib`_pthread_start + 70
    frame #9: 0x00007fff75724415 libsystem_pthread.dylib`thread_start + 13
  thread #8
    frame #0: 0x00007fff756706e6 libsystem_kernel.dylib`__sendmsg + 10
    frame #1: 0x0000000100037de4 grumpycathttpd`boost::asio::detail::socket_ops::non_blocking_send(int, iovec const*, unsigned long, int, boost::system::error_code&, unsigned long&) + 100
    frame #2: 0x0000000100037d4f grumpycathttpd`boost::asio::detail::reactive_socket_send_op_base<boost::asio::const_buffers_1>::do_perform(boost::asio::detail::reactor_op*) + 63
    frame #3: 0x000000010000fb00 grumpycathttpd`boost::asio::detail::kqueue_reactor::start_op(int, int, boost::asio::detail::kqueue_reactor::descriptor_state*&, boost::asio::detail::reactor_op*, bool, bool) + 304
    frame #4: 0x000000010000f927 grumpycathttpd`boost::asio::detail::reactive_socket_service_base::start_op(boost::asio::detail::reactive_socket_service_base::base_implementation_type&, int, boost::asio::detail::reactor_op*, bool, bool, bool) + 103
    frame #5: 0x0000000100057c17 grumpycathttpd`void boost::asio::detail::reactive_socket_service_base::async_send<boost::asio::const_buffers_1, boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp>, boost::asio::mutable_buffers_1, boost::asio::mutable_buffer const*, boost::asio::detail::transfer_all_t, boost::asio::detail::buffered_flush_handler<boost::asio::detail::coro_handler<boost::asio::executor_binder<void (*)(), boost::asio::executor>, unsigned long> > > >(boost::asio::detail::reactive_socket_service_base::base_implementation_type&, boost::asio::const_buffers_1 const&, int, boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp>, boost::asio::mutable_buffers_1, boost::asio::mutable_buffer const*, boost::asio::detail::transfer_all_t, boost::asio::detail::buffered_flush_handler<boost::asio::detail::coro_handler<boost::asio::executor_binder<void (*)(), boost::asio::executor>, unsigned long> > >&) + 135
    frame #6: 0x00000001000579b7 grumpycathttpd`void boost::asio::detail::start_write_buffer_sequence_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp>, boost::asio::mutable_buffers_1, boost::asio::mutable_buffer const*, boost::asio::detail::transfer_all_t, boost::asio::detail::buffered_flush_handler<boost::asio::detail::coro_handler<boost::asio::executor_binder<void (*)(), boost::asio::executor>, unsigned long> > >(boost::asio::basic_stream_socket<boost::asio::ip::tcp>&, boost::asio::mutable_buffers_1 const&, boost::asio::mutable_buffer const* const&, boost::asio::detail::transfer_all_t, boost::asio::detail::buffered_flush_handler<boost::asio::detail::coro_handler<boost::asio::executor_binder<void (*)(), boost::asio::executor>, unsigned long> >&) + 231
    frame #7: 0x000000010005778d grumpycathttpd`boost::asio::async_result<std::__1::decay<boost::asio::basic_yield_context<boost::asio::executor_binder<void (*)(), boost::asio::executor> >&>::type, void (boost::system::error_code, unsigned long)>::return_type boost::asio::buffered_write_stream<boost::asio::basic_stream_socket<boost::asio::ip::tcp>&>::async_flush<boost::asio::basic_yield_context<boost::asio::executor_binder<void (*)(), boost::asio::executor> >&>(boost::asio::basic_yield_context<boost::asio::executor_binder<void (*)(), boost::asio::executor> >&&&) + 573
    frame #8: 0x0000000100010cc8 grumpycathttpd`boost::coroutines::detail::push_coroutine_object<boost::coroutines::pull_coroutine<void>, void, boost::asio::detail::coro_entry_point<boost::asio::executor_binder<void (*)(), boost::asio::strand<boost::asio::io_context::executor_type> >, main::$_0::operator()(boost::asio::basic_yield_context<boost::asio::executor_binder<void (*)(), boost::asio::executor> >) const::'lambda'(boost::asio::basic_yield_context<boost::asio::executor_binder<void (*)(), boost::asio::executor> >)>&, boost::coroutines::basic_standard_stack_allocator<boost::coroutines::stack_traits> >::run() + 2040
    frame #9: 0x00000001000104d0 grumpycathttpd`void boost::coroutines::detail::trampoline_push_void<boost::coroutines::detail::push_coroutine_object<boost::coroutines::pull_coroutine<void>, void, boost::asio::detail::coro_entry_point<boost::asio::executor_binder<void (*)(), boost::asio::strand<boost::asio::io_context::executor_type> >, main::$_0::operator()(boost::asio::basic_yield_context<boost::asio::executor_binder<void (*)(), boost::asio::executor> >) const::'lambda'(boost::asio::basic_yield_context<boost::asio::executor_binder<void (*)(), boost::asio::executor> >)>&, boost::coroutines::basic_standard_stack_allocator<boost::coroutines::stack_traits> > >(boost::context::detail::transfer_t) + 96
    frame #10: 0x000000010021832f libboost_context-mt.dylib`trampoline + 3
  thread #9
    frame #0: 0x00007fff756706e6 libsystem_kernel.dylib`__sendmsg + 10
    frame #1: 0x0000000100037de4 grumpycathttpd`boost::asio::detail::socket_ops::non_blocking_send(int, iovec const*, unsigned long, int, boost::system::error_code&, unsigned long&) + 100
    frame #2: 0x0000000100037d4f grumpycathttpd`boost::asio::detail::reactive_socket_send_op_base<boost::asio::const_buffers_1>::do_perform(boost::asio::detail::reactor_op*) + 63
    frame #3: 0x000000010000fb00 grumpycathttpd`boost::asio::detail::kqueue_reactor::start_op(int, int, boost::asio::detail::kqueue_reactor::descriptor_state*&, boost::asio::detail::reactor_op*, bool, bool) + 304
    frame #4: 0x000000010000f927 grumpycathttpd`boost::asio::detail::reactive_socket_service_base::start_op(boost::asio::detail::reactive_socket_service_base::base_implementation_type&, int, boost::asio::detail::reactor_op*, bool, bool, bool) + 103
    frame #5: 0x0000000100057c17 grumpycathttpd`void boost::asio::detail::reactive_socket_service_base::async_send<boost::asio::const_buffers_1, boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp>, boost::asio::mutable_buffers_1, boost::asio::mutable_buffer const*, boost::asio::detail::transfer_all_t, boost::asio::detail::buffered_flush_handler<boost::asio::detail::coro_handler<boost::asio::executor_binder<void (*)(), boost::asio::executor>, unsigned long> > > >(boost::asio::detail::reactive_socket_service_base::base_implementation_type&, boost::asio::const_buffers_1 const&, int, boost::asio::detail::write_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp>, boost::asio::mutable_buffers_1, boost::asio::mutable_buffer const*, boost::asio::detail::transfer_all_t, boost::asio::detail::buffered_flush_handler<boost::asio::detail::coro_handler<boost::asio::executor_binder<void (*)(), boost::asio::executor>, unsigned long> > >&) + 135
    frame #6: 0x00000001000579b7 grumpycathttpd`void boost::asio::detail::start_write_buffer_sequence_op<boost::asio::basic_stream_socket<boost::asio::ip::tcp>, boost::asio::mutable_buffers_1, boost::asio::mutable_buffer const*, boost::asio::detail::transfer_all_t, boost::asio::detail::buffered_flush_handler<boost::asio::detail::coro_handler<boost::asio::executor_binder<void (*)(), boost::asio::executor>, unsigned long> > >(boost::asio::basic_stream_socket<boost::asio::ip::tcp>&, boost::asio::mutable_buffers_1 const&, boost::asio::mutable_buffer const* const&, boost::asio::detail::transfer_all_t, boost::asio::detail::buffered_flush_handler<boost::asio::detail::coro_handler<boost::asio::executor_binder<void (*)(), boost::asio::executor>, unsigned long> >&) + 231
    frame #7: 0x000000010005778d grumpycathttpd`boost::asio::async_result<std::__1::decay<boost::asio::basic_yield_context<boost::asio::executor_binder<void (*)(), boost::asio::executor> >&>::type, void (boost::system::error_code, unsigned long)>::return_type boost::asio::buffered_write_stream<boost::asio::basic_stream_socket<boost::asio::ip::tcp>&>::async_flush<boost::asio::basic_yield_context<boost::asio::executor_binder<void (*)(), boost::asio::executor> >&>(boost::asio::basic_yield_context<boost::asio::executor_binder<void (*)(), boost::asio::executor> >&&&) + 573
    frame #8: 0x0000000100010cc8 grumpycathttpd`boost::coroutines::detail::push_coroutine_object<boost::coroutines::pull_coroutine<void>, void, boost::asio::detail::coro_entry_point<boost::asio::executor_binder<void (*)(), boost::asio::strand<boost::asio::io_context::executor_type> >, main::$_0::operator()(boost::asio::basic_yield_context<boost::asio::executor_binder<void (*)(), boost::asio::executor> >) const::'lambda'(boost::asio::basic_yield_context<boost::asio::executor_binder<void (*)(), boost::asio::executor> >)>&, boost::coroutines::basic_standard_stack_allocator<boost::coroutines::stack_traits> >::run() + 2040
    frame #9: 0x00000001000104d0 grumpycathttpd`void boost::coroutines::detail::trampoline_push_void<boost::coroutines::detail::push_coroutine_object<boost::coroutines::pull_coroutine<void>, void, boost::asio::detail::coro_entry_point<boost::asio::executor_binder<void (*)(), boost::asio::strand<boost::asio::io_context::executor_type> >, main::$_0::operator()(boost::asio::basic_yield_context<boost::asio::executor_binder<void (*)(), boost::asio::executor> >) const::'lambda'(boost::asio::basic_yield_context<boost::asio::executor_binder<void (*)(), boost::asio::executor> >)>&, boost::coroutines::basic_standard_stack_allocator<boost::coroutines::stack_traits> > >(boost::context::detail::transfer_t) + 96
    frame #10: 0x000000010021832f libboost_context-mt.dylib`trampoline + 3
(lldb)

编辑#2

看来我的错误之一是work_queue#async_run()必须是:

    template<class Handler>
    void async_run(function<void()> task, Handler&& handler) {
        async_completion<Handler, void(error_code)> init(handler);

        {
            auto completion_handler (init.completion_handler);
            unique_lock<mutex> ul (mtx);
            tasks.emplace(enqueued_task({move(task), [completion_handler](){
                auto completion_handler_ (completion_handler);
                asio_handler_invoke(completion_handler_, &completion_handler_);
            }}));
            cond_var.notify_all();
        }

        init.result.get();
    }

但这并没有解决问题。

4

0 回答 0