在您进行编辑之前,我开始研究一种真正的异步方法:
让我们把手续办妥:
#include <boost/asio.hpp>
#include <boost/process.hpp>
#include <boost/process/async.hpp>
namespace ba = boost::asio;
namespace bp = boost::process;
#include <iostream>
#define LOG(x) std::clog
现在让我们创建一个在析构函数中关闭ProcessManager
的单个上运行所有进程的进程。io_service
IO 服务用于调度所有工作(如异步 IO)。我有
- 随机决定专注于面向行的IO操作
- 决定可能没有理由使用超过 1 个 IO 线程,但如果您愿意的话,有一个
strand
可以正确同步与孩子相关的操作。
#include <map>
#include <list>
#include <thread>
class ProcessManager { // ugh naming smell
using error_code = boost::system::error_code;
private:
ba::io_service _service;
boost::optional<ba::io_service::work> _keep{_service};
boost::process::group _group;
std::thread io_thread;
struct patchProcess : std::enable_shared_from_this<patchProcess> {
using ptr = std::shared_ptr<patchProcess>;
static ptr start(std::string command, std::vector<std::string> args, ProcessManager& mgr) {
ptr p(new patchProcess(std::move(command), std::move(args), mgr));
p->output_read_loop();
return p;
}
boost::optional<std::string> getline() {
std::lock_guard<std::mutex> lk(_mx);
std::string s;
if (has_newline(_output.data()) && std::getline(std::istream(&_output), s))
return s;
return boost::none;
}
void write(std::string message) {
std::lock_guard<std::mutex> lk(_mx);
_input_bufs.push_back({false, std::move(message)});
if (_input_bufs.size() == 1)
input_write_loop();
}
void close_stdin() {
std::lock_guard<std::mutex> lk(_mx);
if (_input_bufs.empty()) {
_strand.post([this, self=shared_from_this()] { _stdin.close(); });
} else {
_input_bufs.push_back({true, {}});
}
}
bool is_running() { return _process.running(); }
private:
patchProcess(std::string command, std::vector<std::string> args, ProcessManager& mgr)
: _strand(mgr._service),
_stdout(mgr._service), _stdin(mgr._service),
_process(command, args, mgr._group, bp::std_out > _stdout, bp::std_in < _stdin, mgr._service)
{ }
void output_read_loop() {
ba::async_read_until(_stdout, pending_output, "\n", _strand.wrap([this, self=shared_from_this()](error_code ec, size_t /*transferred*/) {
if (!ec) {
std::lock_guard<std::mutex> lk(_mx);
std::ostream(&_output) << &pending_output;
output_read_loop();
}
}));
}
void input_write_loop() { // assumes _mx locked
if (!_input_bufs.empty()) {
auto& msg = _input_bufs.front();
if (msg.eof) {
_strand.post([this, self=shared_from_this()] { _stdin.close(); });
} else {
ba::async_write(_stdin, ba::buffer(_input_bufs.front().pay_load),
_strand.wrap([this, self=shared_from_this()](error_code ec, size_t /*transferred*/) {
std::lock_guard<std::mutex> lk(_mx);
_input_bufs.pop_front();
if (!ec)
input_write_loop();
}));
}
}
}
ba::io_service::strand _strand; // thread-safe
// strand-local
bp::async_pipe _stdout, _stdin;
bp::child _process;
ba::streambuf pending_output;
// mutex protected
std::mutex mutable _mx;
struct out_message { bool eof; std::string pay_load; };
std::list<out_message> _input_bufs; // iterator stability again!
ba::streambuf _output;
// static helpers
template <typename T>
static bool has_newline(T buffer) {
return std::find(buffers_begin(buffer), buffers_end(buffer), '\n') != buffers_end(buffer);
}
};
using Map = std::map<std::string, patchProcess::ptr>; // iterator stability required!
Map processList;
void eventloop() {
for(;;) try {
if (!_service.run()) break;
} catch(std::exception const& e) {
LOG(error) << "Exception in handler: " << e.what() << "\n";
}
}
public:
ProcessManager() : io_thread([this] { eventloop(); }) { }
~ProcessManager() {
status(__FUNCTION__);
_keep.reset();
io_thread.join();
status(__FUNCTION__);
}
void status(std::string const& caption = "Status") const {
for (auto& p : processList) {
LOG(info) << caption << ": '" << p.first << "' is " << (p.second->is_running()? "still running":"done") << "\n";
}
}
patchProcess::ptr addNew(std::string name, std::string command, std::vector<std::string> args) {
auto pit = processList.find(name);
if (pit != processList.end()) {
if (pit->second->is_running()) {
LOG(error) << "Process already running ('" << name << "')\n";
return {};
}
// TODO make sure process cleaned up etc.
}
LOG(info) << "Creating process for patch " << name << "\n";
return processList[name] = patchProcess::start(std::move(command), std::move(args), *this);
}
};
演示
最天真的运行是:
int main() {
ProcessManager pm;
}
可以预见的是,什么都不做后会返回。接下来,我们尝试
int main() {
ProcessManager pm;
pm.addNew("sleeper", "/bin/bash", {"-c", "sleep 3" });
}
可以预见的是,它会在退出前等待 3 秒。它打印:
Creating process for patch sleeper
~ProcessManager: 'sleeper' is still running
~ProcessManager: 'sleeper' is done
但是等等!你不是特别说不想等吗?好吧,没有!在此期间,你可以做任何你想做的事。只是ProcessManager
' 的析构函数将 - 默认情况下 - 等待孩子完成。
让我们做一些 IO:
Live On Coliru
int main() {
ProcessManager pm;
auto ls = pm.addNew("listing", "/bin/ls", {"-ltr" });
boost::optional<std::string> l;
while ((l = ls->getline()) || ls->is_running()) {
if (l.is_initialized()) {
std::cout << "ls: " << std::quoted(*l) << std::endl;
l.reset();
}
}
}
印刷
Creating process for patch listing
ls: "total 172"
ls: "-rw-rw-rw- 1 2001 2000 5645 Feb 11 00:10 main.cpp"
ls: "-rwxr-xr-x 1 2001 2000 162784 Feb 11 00:10 a.out"
~ProcessManager: 'listing' is done
~ProcessManager: 'listing' is done
为了真正理解进程及其IO 是同步的,我们可以替换
auto ls = pm.addNew("listing", "/bin/ls", {"-ltr" });
有更多时变的东西:
auto ls = pm.addNew("listing", "/bin/bash", {"-c", "ls -ltr | while read line; do sleep 1; echo \"$line\"; done" });
现在,为了使它真正具有挑战性,让我们添加另一个子进程并将输出发送ls
到另一个child
:
Live On Coliru
int main() {
ProcessManager pm;
auto ls = pm.addNew("listing", "/bin/bash", {"-c", "ls -ltr | while read line; do sleep 1; echo \"$line\"; done" });
auto xxd = pm.addNew("hex encoding", "/usr/bin/xxd", {});
boost::optional<std::string> l, x;
bool closed = false;
while ((l || (l = ls->getline())) || (x || (x = xxd->getline())) || ls->is_running() || xxd->is_running()) {
if (l.is_initialized()) {
xxd->write(std::move(*l) + '\n');
l.reset();
std::cout << "[forwarded from ls to xxd]" << std::endl;
} else {
if (!closed && !ls->is_running()) {
std::cout << "[closing input to xxd]" << std::endl;
xxd->close_stdin();
closed = true;
}
}
if (x.is_initialized()) {
std::cout << std::quoted(*x) << std::endl;
x.reset();
}
}
}
现在,在 Coliru 上,列表太小而令人感兴趣,但在我的系统上,您会得到如下输出:
Creating process for patch listing
Creating process for patch hex encoding
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
"00000000: 746f 7461 6c20 3733 3635 0a2d 7277 2d72 total 7365.-rw-r"
"00000010: 772d 722d 2d20 2031 2073 6568 6520 7365 w-r-- 1 sehe se"
"00000020: 6865 2020 2020 3133 3737 206d 6569 2031 he 1377 mei 1"
"00000030: 3020 2032 3031 3720 636d 616b 655f 696e 0 2017 cmake_in"
"00000040: 7374 616c 6c2e 636d 616b 650a 6c72 7778 stall.cmake.lrwx"
"00000050: 7277 7872 7778 2020 3120 7365 6865 2073 rwxrwx 1 sehe s"
"00000060: 6568 6520 2020 2020 2020 3820 6d65 6920 ehe 8 mei "
"00000070: 3234 2020 3230 3137 206d 6169 6e2e 6370 24 2017 main.cp"
"00000080: 7020 2d3e 2074 6573 742e 6370 700a 2d72 p -> test.cpp.-r"
"00000090: 772d 7277 2d72 2d2d 2020 3120 7365 6865 w-rw-r-- 1 sehe"
"000000a0: 2073 6568 6520 2020 2020 3531 3420 7365 sehe 514 se"
"000000b0: 7020 3133 2030 383a 3336 2063 6f6d 7069 p 13 08:36 compi"
"000000c0: 6c65 5f63 6f6d 6d61 6e64 732e 6a73 6f6e le_commands.json"
"000000d0: 0a2d 7277 2d72 772d 722d 2d20 2031 2073 .-rw-rw-r-- 1 s"
"000000e0: 6568 6520 7365 6865 2020 2020 3135 3834 ehe sehe 1584"
"000000f0: 2073 6570 2032 3020 3232 3a30 3320 576f sep 20 22:03 Wo"
"00000100: 7264 436f 756e 7465 722e 680a 2d72 772d rdCounter.h.-rw-"
"00000110: 7277 2d72 2d2d 2020 3120 7365 6865 2073 rw-r-- 1 sehe s"
"00000120: 6568 6520 2020 2020 3336 3920 7365 7020 ehe 369 sep "
"00000130: 3233 2030 333a 3131 2063 6f6d 6d6f 6e2e 23 03:11 common."
"00000140: 680a 2d72 772d 7277 2d72 2d2d 2020 3120 h.-rw-rw-r-- 1 "
"00000150: 7365 6865 2073 6568 6520 2020 2020 3533 sehe sehe 53"
"00000160: 3920 7365 7020 3233 2030 333a 3131 2073 9 sep 23 03:11 s"
"00000170: 7472 7563 7473 616d 706c 652e 6870 700a tructsample.hpp."
"00000180: 2d72 772d 7277 2d72 2d2d 2020 3120 7365 -rw-rw-r-- 1 se"
"00000190: 6865 2073 6568 6520 2020 2032 3335 3220 he sehe 2352 "
"000001a0: 7365 7020 3238 2032 333a 3230 2061 6461 sep 28 23:20 ada"
"000001b0: 7074 6976 655f 7061 7273 6572 2e68 0a2d ptive_parser.h.-"
"000001c0: 7277 2d72 772d 722d 2d20 2031 2073 6568 rw-rw-r-- 1 seh"
"000001d0: 6520 7365 6865 2020 2020 3538 3738 2073 e sehe 5878 s"
"000001e0: 6570 2032 3820 3233 3a32 3120 6164 6170 ep 28 23:21 adap"
"000001f0: 7469 7665 5f70 6172 7365 722e 6370 700a tive_parser.cpp."
"00000200: 2d72 772d 7277 2d72 2d2d 2020 3120 7365 -rw-rw-r-- 1 se"
"00000210: 6865 2073 6568 6520 2020 2034 3232 3720 he sehe 4227 "
"00000220: 6f6b 7420 2034 2032 333a 3137 2070 686f okt 4 23:17 pho"
"00000230: 656e 695f 7833 2e68 7070 0a2d 7277 2d72 eni_x3.hpp.-rw-r"
"00000240: 772d 722d 2d20 2031 2073 6568 6520 7365 w-r-- 1 sehe se"
"00000250: 6865 2020 2031 3432 3035 2064 6563 2020 he 14205 dec "
"00000260: 3620 3231 3a30 3820 434d 616b 6543 6163 6 21:08 CMakeCac"
"00000270: 6865 2e74 7874 0a2d 7277 2d72 772d 722d he.txt.-rw-rw-r-"
"00000280: 2d20 2031 2073 6568 6520 7365 6865 2020 - 1 sehe sehe "
"00000290: 2020 3630 3738 2064 6563 2031 3420 3032 6078 dec 14 02"
"000002a0: 3a35 3320 636f 6e6e 6563 7469 6f6e 2e68 :53 connection.h"
"000002b0: 7070 0a2d 7277 7872 7778 722d 7820 2031 pp.-rwxrwxr-x 1"
"000002c0: 2073 6568 6520 7365 6865 2020 2020 3136 sehe sehe 16"
"000002d0: 3736 206a 616e 2031 3220 3032 3a34 3420 76 jan 12 02:44 "
"000002e0: 636f 6d70 696c 655f 6266 2e70 790a 2d72 compile_bf.py.-r"
"000002f0: 772d 722d 2d72 2d2d 2020 3120 7365 6865 w-r--r-- 1 sehe"
"00000300: 2073 6568 6520 2020 2038 3738 3020 6a61 sehe 8780 ja"
"00000310: 6e20 3132 2031 373a 3131 2074 6573 742e n 12 17:11 test."
"00000320: 6269 6e0a 2d72 7778 7277 7872 2d78 2020 bin.-rwxrwxr-x "
"00000330: 3120 7365 6865 2073 6568 6520 2020 2020 1 sehe sehe "
"00000340: 3131 3920 6a61 6e20 3235 2031 333a 3537 119 jan 25 13:57"
"00000350: 2074 6573 742e 7079 0a2d 7277 7872 7778 test.py.-rwxrwx"
"00000360: 722d 7820 2031 2073 6568 6520 7365 6865 r-x 1 sehe sehe"
"00000370: 2020 2020 2020 3736 2066 6562 2020 3820 76 feb 8 "
"00000380: 3130 3a33 3920 7465 7374 2e73 680a 2d72 10:39 test.sh.-r"
"00000390: 772d 7277 2d72 2d2d 2020 3120 7365 6865 w-rw-r-- 1 sehe"
"000003a0: 2073 6568 6520 2020 3236 3536 3920 6665 sehe 26569 fe"
"000003b0: 6220 2039 2031 313a 3533 2064 7261 6674 b 9 11:53 draft"
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[closing input to xxd]
"000003c0: 2e6d 640a 2d72 772d 7277 2d72 2d2d 2020 .md.-rw-rw-r-- "
"000003d0: 3120 7365 6865 2073 6568 6520 2020 2020 1 sehe sehe "
"000003e0: 3131 3620 6665 6220 2039 2031 313a 3534 116 feb 9 11:54"
"000003f0: 2069 6e70 7574 2e74 7874 0a2d 7277 2d72 input.txt.-rw-r"
"00000400: 772d 722d 2d20 2031 2073 6568 6520 7365 w-r-- 1 sehe se"
"00000410: 6865 2020 2020 2020 3739 2066 6562 2031 he 79 feb 1"
"00000420: 3020 3136 3a32 3420 6172 7869 760a 2d72 0 16:24 arxiv.-r"
"00000430: 772d 7277 2d72 2d2d 2020 3120 7365 6865 w-rw-r-- 1 sehe"
"00000440: 2073 6568 6520 2020 2032 3933 3520 6665 sehe 2935 fe"
"00000450: 6220 3130 2031 363a 3238 2043 4d61 6b65 b 10 16:28 CMake"
"00000460: 4c69 7374 732e 7478 740a 2d72 772d 7277 Lists.txt.-rw-rw"
"00000470: 2d72 2d2d 2020 3120 7365 6865 2073 6568 -r-- 1 sehe seh"
"00000480: 6520 2020 2035 3134 3520 6665 6220 3130 e 5145 feb 10"
"00000490: 2031 363a 3238 204d 616b 6566 696c 650a 16:28 Makefile."
"000004a0: 2d72 772d 7277 2d72 2d2d 2020 3120 7365 -rw-rw-r-- 1 se"
"000004b0: 6865 2073 6568 6520 2020 2033 3937 3620 he sehe 3976 "
"000004c0: 6665 6220 3130 2031 363a 3430 2074 6573 feb 10 16:40 tes"
"000004d0: 7431 2e63 7070 0a2d 7277 2d72 772d 722d t1.cpp.-rw-rw-r-"
"000004e0: 2d20 2031 2073 6568 6520 7365 6865 2020 - 1 sehe sehe "
"000004f0: 2020 3632 3434 2066 6562 2031 3120 3031 6244 feb 11 01"
"00000500: 3a31 3320 7465 7374 2e63 7070 0a2d 7277 :13 test.cpp.-rw"
"00000510: 7872 7778 722d 7820 2031 2073 6568 6520 xrwxr-x 1 sehe "
"00000520: 7365 6865 2037 3139 3336 3838 2066 6562 sehe 7193688 feb"
"00000530: 2031 3120 3031 3a31 3320 736f 7465 7374 11 01:13 sotest"
"00000540: 0a2d 7277 2d72 772d 722d 2d20 2031 2073 .-rw-rw-r-- 1 s"
"00000550: 6568 6520 7365 6865 2020 2020 3535 3132 ehe sehe 5512"
"00000560: 2066 6562 2031 3120 3031 3a31 3620 5365 feb 11 01:16 Se"
"00000570: 7373 696f 6e2e 7669 6d0a 6472 7778 7277 ssion.vim.drwxrw"
"00000580: 7872 2d78 2031 3120 7365 6865 2073 6568 xr-x 11 sehe seh"
"00000590: 6520 2020 2020 2032 3320 6665 6220 3131 e 23 feb 11"
"000005a0: 2030 313a 3137 2043 4d61 6b65 4669 6c65 01:17 CMakeFile"
"000005b0: 730a 2d72 772d 7277 2d72 2d2d 2020 3120 s.-rw-rw-r-- 1 "
"000005c0: 7365 6865 2073 6568 6520 2020 2020 2037 sehe sehe 7"
"000005d0: 3520 6665 6220 3131 2030 313a 3137 206f 5 feb 11 01:17 o"
"000005e0: 7574 7075 742e 7478 740a utput.txt."
~ProcessManager: 'hex encoding' is done
~ProcessManager: 'listing' is done
~ProcessManager: 'hex encoding' is done
~ProcessManager: 'listing' is done