1

我正在通过boost::process. 该过程使用std::coutstd::cerr来输出一些信息。我需要检索这些信息。在某些时候,我希望能够存储那些保留顺序和严重性的输出(来自coutor的输出cerr)。

boost::process但考虑到重定向输出的方式,我无法做到这一点。我只能重定向std::cout到一个特定的 ipstream 和std::cerr另一个。然后,在阅读它们时,我无法保留顺序。

这是一个 MCVE 隔离问题:

#include <iostream>

#include <boost/process.hpp>
#include <boost/thread.hpp>
#include <boost/bind.hpp>

void doReadOutput( boost::process::ipstream* is, boost::process::ipstream* err, std::ostream* out )
{
    std::string line;
    
    if ( std::getline( *is, line ) ) 
        *out << "cout: " << line << std::endl;
    if ( std::getline( *err, line ) ) 
        *out << "cerr: " << line << std::endl;
}

void readOutput( boost::process::ipstream* is, boost::process::ipstream* err, std::ostream* out, std::atomic_bool* continueFlag )
{
    std::string line;
    while ( *continueFlag )
    {
        doReadOutput( is, err, out );
    }

    // get last outputs that may remain in buffers
    doReadOutput( is, err, out );
}

int main( int argc, char* argv[] )
{
    if ( argc == 1 )
    {
        // run this same program with "foo" as parameter, to enter "else" statement below from a different process
        try
        {
            boost::process::ipstream is_stream, err_stream;
            std::stringstream merged_output;
            std::atomic_bool continueFlag = true;

            boost::process::child child( argv[0],
                                         std::vector<std::string>{ "foo" },
                                         boost::process::std_out > is_stream,
                                         boost::process::std_err > err_stream );

            boost::thread thrd( boost::bind( readOutput, &is_stream, &err_stream, &merged_output, &continueFlag ) );

            child.wait();

            continueFlag = false;

            thrd.join();

            std::cout << "Program output was:" << std::endl;
            std::cout << merged_output.str();
        }
        catch ( const boost::process::process_error& err )
        {
            std::cerr << "Error: " << err.code() << std::endl;
        }
        catch (...)                                                                                                                                      // @NOCOVERAGE
        {                                                     
            std::cerr << "Unknown error" << std::endl;
        }
    }
    else
    {
        // program invoked through boost::process by "if" statement above

        std::cerr << "Error1" << std::endl;
        std::cout << "Hello World1" << std::endl;
        std::cerr << "Error2" << std::endl;
        std::cerr << "Error3" << std::endl;
        std::cerr << "Error4" << std::endl;
        std::cerr << "Error5" << std::endl;
        std::cout << "Hello World2" << std::endl;
        std::cerr << "Error6" << std::endl;
        std::cout << "Hello World3" << std::endl;
    }

    return 0;
}

当我执行这个程序(在 Windows 10 下使用 Visual Studio 2019 编译)时,它输出:

Program output was:
cout: Hello World1
cerr: Error1
cout: Hello World2
cerr: Error2
cout: Hello World3
cerr: Error3
cerr: Error4
cerr: Error5
cerr: Error6

虽然我想要:

Program output was:
cerr: Error1
cout: Hello World1
cerr: Error2
cerr: Error3
cerr: Error4
cerr: Error5
cout: Hello World2
cerr: Error6
cout: Hello World3

有没有办法做到这一点?


编辑,正如一些程序员花花公子所建议的那样,为每个输出流创建了一个线程:

#include <iostream>

#include <boost/process.hpp>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include <boost/thread/mutex.hpp>

void doReadOutput( boost::process::ipstream* str, std::ostream* out, const std::string& prefix, boost::mutex* mutex )
{
    std::string line;

    if ( std::getline( *str, line ) )
    {
        boost::mutex::scoped_lock lock( *mutex );
        *out << prefix << ": " << line << std::endl;
    }
}

void readOutput( boost::process::ipstream* str, std::ostream* out, std::string prefix, boost::mutex* mutex, std::atomic_bool* continueFlag )
{
    while ( *continueFlag )
    {
        doReadOutput( str, out, prefix, mutex );
        boost::thread::yield();
    }

    // get last outputs that may remain in buffers
    doReadOutput( str, out, prefix, mutex );
}

int main( int argc, char* argv[] )
{
    if ( argc == 1 )
    {
        // run this same program with "foo" as parameter, to enter "else" statement below from a different process
        try
        {
            boost::process::ipstream is_stream, err_stream;

            std::stringstream merged_output;
            std::atomic_bool continueFlag = true;

            boost::process::child child( argv[0],
                                         std::vector<std::string>{ "foo" },
                                         boost::process::std_out > is_stream,
                                         boost::process::std_err > err_stream );

            boost::mutex mutex;
            boost::thread thrdis( boost::bind( readOutput, &is_stream, &merged_output, "cout", &mutex, &continueFlag ) );
            boost::thread thrderr( boost::bind( readOutput, &err_stream, &merged_output, "cerr", &mutex, &continueFlag ) );

            child.wait();

            continueFlag = false;

            thrdis.join();
            thrderr.join();

            std::cout << "Program output was:" << std::endl;
            std::cout << merged_output.str();
        }
        catch ( const boost::process::process_error& err )
        {
            std::cerr << "Error: " << err.code() << std::endl;
        }
        catch (...)                                                                                                                                      // @NOCOVERAGE
        {                                                     
            std::cerr << "Unknown error" << std::endl;
        }
    }
    else
    {
        // program invoked through boost::process by "if" statement above

        std::cerr << "Error1" << std::endl;
        std::cout << "Hello World1" << std::endl;
        std::cerr << "Error2" << std::endl;
        std::cerr << "Error3" << std::endl;
        std::cerr << "Error4" << std::endl;
        std::cerr << "Error5" << std::endl;
        std::cout << "Hello World2" << std::endl;
        std::cerr << "Error6" << std::endl;
        std::cout << "Hello World3" << std::endl;
    }

    return 0;
}

然后输出是:

Program output was:
cerr: Error1
cout: Hello World1
cerr: Error2
cout: Hello World2
cerr: Error3
cout: Hello World3
cerr: Error4
cerr: Error5
cerr: Error6

还是出乎意料...

4

1 回答 1

1

您将需要非阻塞 IO。库中支持的方式是使用异步管道。

您将为 stderr/stdout 运行一个循环

  • async_read 进入缓冲区,直到你得到一个完整的行或更多
  • 一旦可用,就将输入缓冲区中的行复制到输出缓冲区

因为你最终会在管道/缓冲区状态上有两次非常相同的循环,所以将它封装成一个类型是有意义的,例如

    struct IoPump {
        IoPump(io_context& io, std::string& merged) : _pipe(io), _merged(merged) {}

        boost::asio::streambuf _buf;
        bp::async_pipe         _pipe;
        std::string&           _merged;

        void do_loop();
    };

    io_context io;

    std::string merged;
    IoPump outp{io, merged}, errp{io, merged};

    bp::child child(program, std::vector<std::string> { "foo" },
        bp::std_out > outp._pipe, bp::std_err > errp._pipe);

    outp.do_loop(); // prime the pump
    errp.do_loop(); // prime the pump
    io.run();

就这样。好吧,除了当然,IoPump::do_loop()实际上是什么:

void do_loop() {
    boost::asio::async_read_until(_pipe, _buf, "\n",
        [this, out = boost::asio::dynamic_buffer(_merged)](
            error_code ec, size_t xfer) mutable {
            if (!ec) {
                out.commit(buffer_copy(
                    out.prepare(xfer), _buf.data(), xfer));
                _buf.consume(xfer);

                do_loop(); // chain
            } else {
                std::cerr << "IoPump: " << ec.message() << "\n";
            }
        });
}

注意

  • 您的主应用程序完全是单线程的
  • 意味着异步完成处理程序永远不会同时运行
  • 这意味着std::string merged;直接访问输出缓冲区是安全的,无需担心同步

现场演示

住在科利鲁

static void main_program(char const* program);
static void child_program();

int main(int argc, char** argv) {
    if (argc == 1)
        main_program(argv[0]);
    else
        child_program();
}

#include <iostream>
static void child_program() {
    std::cerr << "Error1"       << std::endl;
    std::cout << "Hello World1" << std::endl;
    std::cerr << "Error2"       << std::endl;
    std::cerr << "Error3"       << std::endl;
    std::cerr << "Error4"       << std::endl;
    std::cerr << "Error5"       << std::endl;
    std::cout << "Hello World2" << std::endl;
    std::cerr << "Error6"       << std::endl;
    std::cout << "Hello World3" << std::endl;
}

#include <boost/process.hpp>
#include <boost/asio.hpp>

static void main_program(char const* program) {
    namespace bp = boost::process;
    try {
        using boost::system::error_code;
        using boost::asio::io_context;

        struct IoPump {
            IoPump(io_context& io, std::string& merged) : _pipe(io), _merged(merged) {}

            boost::asio::streambuf _buf;
            bp::async_pipe         _pipe;
            std::string&           _merged;

            void do_loop() {
                boost::asio::async_read_until(_pipe, _buf, "\n",
                    [this, out = boost::asio::dynamic_buffer(_merged)](
                        error_code ec, size_t xfer) mutable {
                        if (!ec) {
                            out.commit(buffer_copy(
                                out.prepare(xfer), _buf.data(), xfer));
                            _buf.consume(xfer);

                            do_loop(); // chain
                        } else {
                            std::cerr << "IoPump: " << ec.message() << "\n";
                        }
                    });
            }
        };

        io_context io;

        std::string merged;
        IoPump outp{io, merged}, errp{io, merged};

        bp::child child(program, std::vector<std::string> { "foo" },
            bp::std_out > outp._pipe, bp::std_err > errp._pipe);

        outp.do_loop(); // prime the pump
        errp.do_loop(); // prime the pump
        io.run();

        std::cout << "Program output was:" << std::endl;
        std::cout << merged;
    } catch (const bp::process_error& err) {
        std::cerr << "Error: " << err.code().message() << std::endl;
    } catch (...) { // @NOCOVERAGE
        std::cerr << "Unknown error" << std::endl;
    }
}

印刷

IoPump: End of file
IoPump: End of file

和标准输出:

Program output was:
Error1
Error2
Hello World1
Error3
Hello World2
Error4
Hello World3
Error5
Error6

其他例子

我在这个网站上已经有很多例子了。只是寻找async_pipe

开箱即用的思考

您可以简单地将 stderr 重定向到描述符级别的 stdout 并完成!例如

住在科利鲁

    boost::asio::io_context io;
    std::future<std::string> merged;

    bp::child child(program, std::vector<std::string> { "foo" },
        bp::std_out > merged, bp::posix::fd.bind(2, 1), io);

    io.run();

    std::cout << "Program output was:" << std::quoted(merged.get()) << "\n";

或者使用逐行阅读循环:

住在科利鲁

    bp::ipstream merged;

    bp::child child(program, std::vector<std::string> { "foo" },
        bp::std_out > merged, bp::posix::fd.bind(2, 1));

    child.wait();

    std::cout << "Program output was:" << std::endl;
    for (std::string line; getline(merged, line);)
        std::cout << "merged: " << std::quoted(line) << "\n";

印刷

Program output was:
merged: "Error1"
merged: "Hello World1"
merged: "Error2"
merged: "Error3"
merged: "Error4"
merged: "Error5"
merged: "Hello World2"
merged: "Error6"
merged: "Hello World3"
于 2021-03-01T15:26:55.833 回答