2

我正在开发一个应用程序,我需要根据用户输入启动和停止各种不同的可执行文件。我希望我的“核心”程序在这些可执行文件运行时正常运行,即不要等待它们的终止,这在理论上可能是无限的。除此之外,我还需要能够接收 std_out 并将 std_in 发送到这些可执行文件。

目前我有一个设置,我有一个流程管理器类:

class ProcessManager {
private:
    std::vector<patchProcess> processList;
    boost::process::group processGroup;
public:
    ProcessManager();
    void addNew(std::string name,std::string command, std::string args);
    void killAll();
    void printAllIn();
};

补丁过程在哪里:

struct patchProcess {
    std::string name;
    boost::process::child *process;
    std::shared_ptr<boost::process::ipstream> procOutStream;
};

我可以在哪里使用该功能启动/添加新进程

void bbefxProcessManager::addNew(std::string name, std::string command, std::string args) {
    LOG(info) << "Creating process for patch " << name;
    patchProcess pp;
    pp.name = name;
    pp.procOutStream = std::shared_ptr<boost::process::ipstream>(new boost::process::ipstream);
    boost::process::child newProc(command,args,processGroup,boost::process::std_out > *pp.procOutStream);
    pp.process = &newProc;
    processList.push_back(pp);
}

我的打印尝试:

void bbefxProcessManager::printAllIn() {
    std::string line;
        for (auto &proc : processList) {
            std::getline(*proc.procOutStream, line);
            std::cout << line << std::endl;
        }
}

此代码成功启动了该过程,但是 readAllIn 给了我一个空白输出。我有一种感觉,我做错了什么std::shared_ptr<boost::process::ipstream> procOutStream;。我这样做的理由是我正在使用push_back我的 processList(结构向量),所以它应该是可复制的。我可以在不使用 patchProcess 结构和这些共享指针的情况下获得测试 exec 的输出,但这会使管理变得困难/混乱。我还可以确认,如果我尝试使用以下内容读取 addNew 函数中的输出:

while(true) {
        *pp.procOutStream >> line;
        std::cout << line << std::endl;

    }

我得到了我的可执行文件的输出。那么这是否暗示复制构造函数出了问题?

4

2 回答 2

2

在您进行编辑之前,我开始研究一种真正的异步方法:

让我们把手续办妥:

#include <boost/asio.hpp>
#include <boost/process.hpp>
#include <boost/process/async.hpp>

namespace ba = boost::asio;
namespace bp = boost::process;

#include <iostream>
#define LOG(x) std::clog

现在让我们创建一个在析构函数中关闭ProcessManager的单个上运行所有进程的进程。io_service

IO 服务用于调度所有工作(如异步 IO)。我有

  • 随机决定专注于面向行的IO操作
  • 决定可能没有理由使用超过 1 个 IO 线程,但如果您愿意的话,有一个strand可以正确同步与孩子相关的操作。
#include <map>
#include <list>
#include <thread>
class ProcessManager { // ugh naming smell
    using error_code = boost::system::error_code;
  private:
    ba::io_service _service;
    boost::optional<ba::io_service::work> _keep{_service};
    boost::process::group _group;
    std::thread io_thread;

    struct patchProcess : std::enable_shared_from_this<patchProcess> {
        using ptr = std::shared_ptr<patchProcess>;
        static ptr start(std::string command, std::vector<std::string> args, ProcessManager& mgr) {
            ptr p(new patchProcess(std::move(command), std::move(args), mgr));
            p->output_read_loop();
            return p;
        }

        boost::optional<std::string> getline() {
            std::lock_guard<std::mutex> lk(_mx);
            std::string s;
            if (has_newline(_output.data()) && std::getline(std::istream(&_output), s))
                return s;
            return boost::none;
        }

        void write(std::string message) {
            std::lock_guard<std::mutex> lk(_mx);
            _input_bufs.push_back({false, std::move(message)});

            if (_input_bufs.size() == 1)
                input_write_loop();
        }

        void close_stdin() {
            std::lock_guard<std::mutex> lk(_mx);
            if (_input_bufs.empty()) {
                _strand.post([this, self=shared_from_this()] { _stdin.close(); });
            } else {
                _input_bufs.push_back({true, {}});
            }
        }

        bool is_running() { return _process.running(); }

      private:
        patchProcess(std::string command, std::vector<std::string> args, ProcessManager& mgr)
            : _strand(mgr._service),
              _stdout(mgr._service), _stdin(mgr._service),
              _process(command, args, mgr._group, bp::std_out > _stdout, bp::std_in < _stdin, mgr._service)
        { }

        void output_read_loop() {
            ba::async_read_until(_stdout, pending_output, "\n", _strand.wrap([this, self=shared_from_this()](error_code ec, size_t /*transferred*/) {
                if (!ec) {
                    std::lock_guard<std::mutex> lk(_mx);
                    std::ostream(&_output) << &pending_output;
                    output_read_loop();
                }
            }));
        }

        void input_write_loop() { // assumes _mx locked
            if (!_input_bufs.empty()) {
                auto& msg = _input_bufs.front();
                if (msg.eof) {
                    _strand.post([this, self=shared_from_this()] { _stdin.close(); });
                } else {
                    ba::async_write(_stdin, ba::buffer(_input_bufs.front().pay_load), 
                        _strand.wrap([this, self=shared_from_this()](error_code ec, size_t /*transferred*/) {
                            std::lock_guard<std::mutex> lk(_mx);
                            _input_bufs.pop_front();
                            if (!ec)
                                input_write_loop();
                        }));
                }
            }
        }

        ba::io_service::strand _strand; // thread-safe

        // strand-local
        bp::async_pipe _stdout, _stdin;
        bp::child _process;
        ba::streambuf pending_output;

        // mutex protected
        std::mutex mutable _mx;
        struct out_message { bool eof; std::string pay_load; };
        std::list<out_message> _input_bufs; // iterator stability again!
        ba::streambuf _output;

        // static helpers
        template <typename T>
        static bool has_newline(T buffer) {
            return std::find(buffers_begin(buffer), buffers_end(buffer), '\n') != buffers_end(buffer);
        }
    };

    using Map = std::map<std::string, patchProcess::ptr>; // iterator stability required!
    Map processList;

    void eventloop() {
        for(;;) try {
            if (!_service.run()) break;
        } catch(std::exception const& e) {
            LOG(error) << "Exception in handler: " << e.what() << "\n";
        }
    }
  public:
    ProcessManager() : io_thread([this] { eventloop(); }) { }

    ~ProcessManager() {
        status(__FUNCTION__);
        _keep.reset();
        io_thread.join();
        status(__FUNCTION__);
    }

    void status(std::string const& caption = "Status") const {
        for (auto& p : processList) {
            LOG(info) << caption << ": '" << p.first << "' is " << (p.second->is_running()? "still running":"done") << "\n";
        }
    }

    patchProcess::ptr addNew(std::string name, std::string command, std::vector<std::string> args) {
        auto pit = processList.find(name);
        if (pit != processList.end()) {
            if (pit->second->is_running()) {
                LOG(error) << "Process already running ('" << name << "')\n";
                return {};
            }
            // TODO make sure process cleaned up etc.
        }
        LOG(info) << "Creating process for patch " << name << "\n";
        return processList[name] = patchProcess::start(std::move(command), std::move(args), *this);
    }
};

演示

最天真的运行是:

int main() {
    ProcessManager pm;
}

可以预见的是,什么都不做后会返回。接下来,我们尝试

int main() {
    ProcessManager pm;
    pm.addNew("sleeper", "/bin/bash", {"-c", "sleep 3" });
}

可以预见的是,它会在退出前等待 3 秒。它打印:

Creating process for patch sleeper
~ProcessManager: 'sleeper' is still running
~ProcessManager: 'sleeper' is done

但是等等!你不是特别说不想等吗?好吧,没有!在此期间,你可以做任何你想做的事。只是ProcessManager' 的析构函数将 - 默认情况下 - 等待孩子完成。

让我们做一些 IO:

Live On Coliru

int main() {
    ProcessManager pm;

    auto ls  = pm.addNew("listing", "/bin/ls", {"-ltr" });

    boost::optional<std::string> l;

    while ((l = ls->getline()) || ls->is_running()) {
        if (l.is_initialized()) {
            std::cout << "ls: " << std::quoted(*l) << std::endl;
            l.reset();
        }
    }
}

印刷

Creating process for patch listing
ls: "total 172"
ls: "-rw-rw-rw- 1 2001 2000   5645 Feb 11 00:10 main.cpp"
ls: "-rwxr-xr-x 1 2001 2000 162784 Feb 11 00:10 a.out"
~ProcessManager: 'listing' is done
~ProcessManager: 'listing' is done

为了真正理解进程及其IO 是同步的,我们可以替换

auto ls = pm.addNew("listing", "/bin/ls", {"-ltr" });

有更多时变的东西:

auto ls = pm.addNew("listing", "/bin/bash", {"-c", "ls -ltr | while read line; do sleep 1; echo \"$line\"; done" });

现在,为了使它真正具有挑战性,让我们添加另一个子进程并将输出发送ls到另一个child

Live On Coliru

int main() {
    ProcessManager pm;

    auto ls  = pm.addNew("listing", "/bin/bash", {"-c", "ls -ltr | while read line; do sleep 1; echo \"$line\"; done" });
    auto xxd = pm.addNew("hex encoding", "/usr/bin/xxd", {});

    boost::optional<std::string> l, x;

    bool closed = false;
    while ((l || (l = ls->getline())) || (x || (x = xxd->getline())) || ls->is_running() || xxd->is_running()) {
        if (l.is_initialized()) {
            xxd->write(std::move(*l) + '\n');
            l.reset();
            std::cout << "[forwarded from ls to xxd]" << std::endl;
        } else {
            if (!closed && !ls->is_running()) {
                std::cout << "[closing input to xxd]" << std::endl;
                xxd->close_stdin();
                closed = true;
            }
        }

        if (x.is_initialized()) {
            std::cout << std::quoted(*x) << std::endl;
            x.reset();
        }
    }
}

现在,在 Coliru 上,列表太小而令人感兴趣,但在我的系统上,您会得到如下输出:

Creating process for patch listing
Creating process for patch hex encoding
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
"00000000: 746f 7461 6c20 3733 3635 0a2d 7277 2d72  total 7365.-rw-r"
"00000010: 772d 722d 2d20 2031 2073 6568 6520 7365  w-r--  1 sehe se"
"00000020: 6865 2020 2020 3133 3737 206d 6569 2031  he    1377 mei 1"
"00000030: 3020 2032 3031 3720 636d 616b 655f 696e  0  2017 cmake_in"
"00000040: 7374 616c 6c2e 636d 616b 650a 6c72 7778  stall.cmake.lrwx"
"00000050: 7277 7872 7778 2020 3120 7365 6865 2073  rwxrwx  1 sehe s"
"00000060: 6568 6520 2020 2020 2020 3820 6d65 6920  ehe       8 mei "
"00000070: 3234 2020 3230 3137 206d 6169 6e2e 6370  24  2017 main.cp"
"00000080: 7020 2d3e 2074 6573 742e 6370 700a 2d72  p -> test.cpp.-r"
"00000090: 772d 7277 2d72 2d2d 2020 3120 7365 6865  w-rw-r--  1 sehe"
"000000a0: 2073 6568 6520 2020 2020 3531 3420 7365   sehe     514 se"
"000000b0: 7020 3133 2030 383a 3336 2063 6f6d 7069  p 13 08:36 compi"
"000000c0: 6c65 5f63 6f6d 6d61 6e64 732e 6a73 6f6e  le_commands.json"
"000000d0: 0a2d 7277 2d72 772d 722d 2d20 2031 2073  .-rw-rw-r--  1 s"
"000000e0: 6568 6520 7365 6865 2020 2020 3135 3834  ehe sehe    1584"
"000000f0: 2073 6570 2032 3020 3232 3a30 3320 576f   sep 20 22:03 Wo"
"00000100: 7264 436f 756e 7465 722e 680a 2d72 772d  rdCounter.h.-rw-"
"00000110: 7277 2d72 2d2d 2020 3120 7365 6865 2073  rw-r--  1 sehe s"
"00000120: 6568 6520 2020 2020 3336 3920 7365 7020  ehe     369 sep "
"00000130: 3233 2030 333a 3131 2063 6f6d 6d6f 6e2e  23 03:11 common."
"00000140: 680a 2d72 772d 7277 2d72 2d2d 2020 3120  h.-rw-rw-r--  1 "
"00000150: 7365 6865 2073 6568 6520 2020 2020 3533  sehe sehe     53"
"00000160: 3920 7365 7020 3233 2030 333a 3131 2073  9 sep 23 03:11 s"
"00000170: 7472 7563 7473 616d 706c 652e 6870 700a  tructsample.hpp."
"00000180: 2d72 772d 7277 2d72 2d2d 2020 3120 7365  -rw-rw-r--  1 se"
"00000190: 6865 2073 6568 6520 2020 2032 3335 3220  he sehe    2352 "
"000001a0: 7365 7020 3238 2032 333a 3230 2061 6461  sep 28 23:20 ada"
"000001b0: 7074 6976 655f 7061 7273 6572 2e68 0a2d  ptive_parser.h.-"
"000001c0: 7277 2d72 772d 722d 2d20 2031 2073 6568  rw-rw-r--  1 seh"
"000001d0: 6520 7365 6865 2020 2020 3538 3738 2073  e sehe    5878 s"
"000001e0: 6570 2032 3820 3233 3a32 3120 6164 6170  ep 28 23:21 adap"
"000001f0: 7469 7665 5f70 6172 7365 722e 6370 700a  tive_parser.cpp."
"00000200: 2d72 772d 7277 2d72 2d2d 2020 3120 7365  -rw-rw-r--  1 se"
"00000210: 6865 2073 6568 6520 2020 2034 3232 3720  he sehe    4227 "
"00000220: 6f6b 7420 2034 2032 333a 3137 2070 686f  okt  4 23:17 pho"
"00000230: 656e 695f 7833 2e68 7070 0a2d 7277 2d72  eni_x3.hpp.-rw-r"
"00000240: 772d 722d 2d20 2031 2073 6568 6520 7365  w-r--  1 sehe se"
"00000250: 6865 2020 2031 3432 3035 2064 6563 2020  he   14205 dec  "
"00000260: 3620 3231 3a30 3820 434d 616b 6543 6163  6 21:08 CMakeCac"
"00000270: 6865 2e74 7874 0a2d 7277 2d72 772d 722d  he.txt.-rw-rw-r-"
"00000280: 2d20 2031 2073 6568 6520 7365 6865 2020  -  1 sehe sehe  "
"00000290: 2020 3630 3738 2064 6563 2031 3420 3032    6078 dec 14 02"
"000002a0: 3a35 3320 636f 6e6e 6563 7469 6f6e 2e68  :53 connection.h"
"000002b0: 7070 0a2d 7277 7872 7778 722d 7820 2031  pp.-rwxrwxr-x  1"
"000002c0: 2073 6568 6520 7365 6865 2020 2020 3136   sehe sehe    16"
"000002d0: 3736 206a 616e 2031 3220 3032 3a34 3420  76 jan 12 02:44 "
"000002e0: 636f 6d70 696c 655f 6266 2e70 790a 2d72  compile_bf.py.-r"
"000002f0: 772d 722d 2d72 2d2d 2020 3120 7365 6865  w-r--r--  1 sehe"
"00000300: 2073 6568 6520 2020 2038 3738 3020 6a61   sehe    8780 ja"
"00000310: 6e20 3132 2031 373a 3131 2074 6573 742e  n 12 17:11 test."
"00000320: 6269 6e0a 2d72 7778 7277 7872 2d78 2020  bin.-rwxrwxr-x  "
"00000330: 3120 7365 6865 2073 6568 6520 2020 2020  1 sehe sehe     "
"00000340: 3131 3920 6a61 6e20 3235 2031 333a 3537  119 jan 25 13:57"
"00000350: 2074 6573 742e 7079 0a2d 7277 7872 7778   test.py.-rwxrwx"
"00000360: 722d 7820 2031 2073 6568 6520 7365 6865  r-x  1 sehe sehe"
"00000370: 2020 2020 2020 3736 2066 6562 2020 3820        76 feb  8 "
"00000380: 3130 3a33 3920 7465 7374 2e73 680a 2d72  10:39 test.sh.-r"
"00000390: 772d 7277 2d72 2d2d 2020 3120 7365 6865  w-rw-r--  1 sehe"
"000003a0: 2073 6568 6520 2020 3236 3536 3920 6665   sehe   26569 fe"
"000003b0: 6220 2039 2031 313a 3533 2064 7261 6674  b  9 11:53 draft"
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[forwarded from ls to xxd]
[closing input to xxd]
"000003c0: 2e6d 640a 2d72 772d 7277 2d72 2d2d 2020  .md.-rw-rw-r--  "
"000003d0: 3120 7365 6865 2073 6568 6520 2020 2020  1 sehe sehe     "
"000003e0: 3131 3620 6665 6220 2039 2031 313a 3534  116 feb  9 11:54"
"000003f0: 2069 6e70 7574 2e74 7874 0a2d 7277 2d72   input.txt.-rw-r"
"00000400: 772d 722d 2d20 2031 2073 6568 6520 7365  w-r--  1 sehe se"
"00000410: 6865 2020 2020 2020 3739 2066 6562 2031  he      79 feb 1"
"00000420: 3020 3136 3a32 3420 6172 7869 760a 2d72  0 16:24 arxiv.-r"
"00000430: 772d 7277 2d72 2d2d 2020 3120 7365 6865  w-rw-r--  1 sehe"
"00000440: 2073 6568 6520 2020 2032 3933 3520 6665   sehe    2935 fe"
"00000450: 6220 3130 2031 363a 3238 2043 4d61 6b65  b 10 16:28 CMake"
"00000460: 4c69 7374 732e 7478 740a 2d72 772d 7277  Lists.txt.-rw-rw"
"00000470: 2d72 2d2d 2020 3120 7365 6865 2073 6568  -r--  1 sehe seh"
"00000480: 6520 2020 2035 3134 3520 6665 6220 3130  e    5145 feb 10"
"00000490: 2031 363a 3238 204d 616b 6566 696c 650a   16:28 Makefile."
"000004a0: 2d72 772d 7277 2d72 2d2d 2020 3120 7365  -rw-rw-r--  1 se"
"000004b0: 6865 2073 6568 6520 2020 2033 3937 3620  he sehe    3976 "
"000004c0: 6665 6220 3130 2031 363a 3430 2074 6573  feb 10 16:40 tes"
"000004d0: 7431 2e63 7070 0a2d 7277 2d72 772d 722d  t1.cpp.-rw-rw-r-"
"000004e0: 2d20 2031 2073 6568 6520 7365 6865 2020  -  1 sehe sehe  "
"000004f0: 2020 3632 3434 2066 6562 2031 3120 3031    6244 feb 11 01"
"00000500: 3a31 3320 7465 7374 2e63 7070 0a2d 7277  :13 test.cpp.-rw"
"00000510: 7872 7778 722d 7820 2031 2073 6568 6520  xrwxr-x  1 sehe "
"00000520: 7365 6865 2037 3139 3336 3838 2066 6562  sehe 7193688 feb"
"00000530: 2031 3120 3031 3a31 3320 736f 7465 7374   11 01:13 sotest"
"00000540: 0a2d 7277 2d72 772d 722d 2d20 2031 2073  .-rw-rw-r--  1 s"
"00000550: 6568 6520 7365 6865 2020 2020 3535 3132  ehe sehe    5512"
"00000560: 2066 6562 2031 3120 3031 3a31 3620 5365   feb 11 01:16 Se"
"00000570: 7373 696f 6e2e 7669 6d0a 6472 7778 7277  ssion.vim.drwxrw"
"00000580: 7872 2d78 2031 3120 7365 6865 2073 6568  xr-x 11 sehe seh"
"00000590: 6520 2020 2020 2032 3320 6665 6220 3131  e      23 feb 11"
"000005a0: 2030 313a 3137 2043 4d61 6b65 4669 6c65   01:17 CMakeFile"
"000005b0: 730a 2d72 772d 7277 2d72 2d2d 2020 3120  s.-rw-rw-r--  1 "
"000005c0: 7365 6865 2073 6568 6520 2020 2020 2037  sehe sehe      7"
"000005d0: 3520 6665 6220 3131 2030 313a 3137 206f  5 feb 11 01:17 o"
"000005e0: 7574 7075 742e 7478 740a                 utput.txt."
~ProcessManager: 'hex encoding' is done
~ProcessManager: 'listing' is done
~ProcessManager: 'hex encoding' is done
~ProcessManager: 'listing' is done
于 2018-02-11T00:18:04.690 回答
0

我发现我忽略了 boost::process::child *process;struct member ,当与共享指针一起使用它时,ipstream我得到了预期的结果。

不过,我将保留这个问题,因为我仍然不完全高兴我以正确的方式做到这一点,并希望得到一些意见。

于 2018-02-11T00:01:20.773 回答