2

注意:我在这篇文章中交替使用“客户端”和“孩子”这两个词来指代从“服务器”启动的进程。

我正在使用 boost::process::async_pipe 编写使用 boost::process::child 启动的进程的 STDIN。假设我的服务器程序看起来像这样:

(这不是一个有效的服务器演示)

服务器.cpp

int main()
{
    using namespace std::chrono_literals;

    boost::process::async_pipe writePipe;
    boost::process::child child { "client", boost::process::std_in < _writePipe };

    std::vector<char> buffer;
    buffer.resize(1024u * 1024u);

    while (working)
    {
        auto length = 0u;

        /* 
            do a bunch of work that takes a long time
            and also determines `length`, in this case I'm 
            adding a sleep to simulate the time between 
            calls to `async_write()`
        */

        std::this_thread::sleep_for(5s);

        boost::asio::async_write(writePipe,
            boost::asio::buffer(buffer.data(), length),
            [&writePipe](boost::system::error_code, size_t)
            {
                // writePipe.close();
            });


        /* 
            I know that this code as-is would have issues with
            synchronizing `buffer`, but for the purpose of this
            question please ignore that
        */
    }
}

基本上我有一个内存缓冲区,我正在其中做一些工作,并且我经常想向子进程发送一些二进制数据。我的子进程看起来像这样:

孩子.cpp

#include <iostream>
#include <string_view>

void print_hex(const char* p, std::size_t size)
{
    std::string_view input(p, size);

    static const char* const lut = "0123456789ABCDEF";
    size_t len = input.length();

    std::string output;
    output.reserve(2 * len);
    for (size_t i = 0; i < len; ++i)
    {
        const unsigned char c = static_cast<const unsigned char>(input[i]);
        // output.append("0x");
        output.push_back(lut[c >> 4]);
        output.push_back(lut[c & 15]);
        output.append(" ");
    }

    if (output.size() > 0) output.pop_back();
    std::cout << "HEX (" << size<< "): " << output << std::endl;
}

int main()
{
    std::vector<char> buffer;
    buffer.resize(BUFFER_SIZE);

    bool done = false;
    while (!done)
    {
        auto rdbuf = std::cin.rdbuf();
        while (auto count = rdbuf->sgetn(buffer.data(), BUFFER_SIZE))
        {
            print_hex(buffer.data(), count);
        }
    }
}

writePipe.close()注释掉后,我注意到我的子程序在服务器进程终止之前从未获得任何数据。如果我取消注释关闭管道的调用,那么我只能在第一次boost::asio::async_write()调用时处理数据。

编辑:

不幸的是,@sehe 的原始答案并没有解决这个问题。我稍微更新了服务器代码以更好地说明问题(并且我解决了保留/调整大小问题)。

然而,当我再次环顾四周时,我读到了一些关于sgetn()它的语言:

streambuf 中 xsgetn 的默认定义从受控输入序列中检索字符并将它们存储在 s 指向的数组中,直到提取了 n 个字符或到达序列的末尾。

所以,我重构了我的客户端,首先询问流有多少字节可用,然后分块读取流。这是我的第一次尝试:

    bool done = false;
    while (!done)
    {
        auto rdbuf = std::cin.rdbuf();
        const auto available = rdbuf->in_avail();
        if (available == 0)
        {
            continue;
        }

        auto bytesToRead = std::min(BUFFER_SIZE, static_cast<std::uint32_t>(available));
        auto bytesRead = rdbuf->sgetn(buffer.data(), bytesToRead);
        print_hex(buffer.data(), bytesRead);

        while (bytesRead < available)
        {
            bytesToRead = std::min(BUFFER_SIZE, static_cast<std::uint32_t>(available - bytesRead));
            bytesRead += rdbuf->sgetn(buffer.data(), bytesToRead);
            print_hex(buffer.data(), bytesRead);
        }
    }

但是,即使在添加之后std::cin.sync_with_stdio(false);(来自答案Why does in_avail() output zero even if the stream has some char?),调用rdbuf->in_avail()总是返回0。即使我尝试在我的服务器之外并在命令行上尝试,例如:ls | client

我希望我的客户端程序在数据进入时读取数据,而不必(1)关闭服务器进程或(2)关闭管道(除非我可以重新打开管道以执行后续write()。

谢谢!

4

1 回答 1

1

钱币。我花了大量时间调整服务器,直到它工作为止。

原来有......客户端的一个错误。

基本上,你写的地方.reserve(...)应该放.resize()

buffer.resize(BUFFER_SIZE);

服务器中也可能存在类似的错误,即使您不显示完整的代码,那里的保留似乎也很奇怪。

现在,我不确定服务器部分实际需要多少更改,但让我把它说出来,因为我最终成功地对其进行了测试。

完全异步

住在科利鲁

#include <boost/asio/io_service.hpp>
#include <boost/process.hpp>
#include <iostream>
#include <random>

static std::mt19937 prng{ std::random_device{}() };

static std::uniform_int_distribution<size_t> lendist(10, 32);
static std::uniform_int_distribution<char> a_z('a', 'z');

static size_t gen_length() { return lendist(prng); }
static char   gen_alpha()  { return a_z(prng);     }

namespace bp = boost::process;
namespace ba = boost::asio;
using namespace std::chrono_literals;

int main() {
    ba::io_service io; // one thread
    //ba::io_service::strand strand(io);
    auto& strand = io;
    bp::async_pipe writePipe(io);
    //bp::child child(bp::exe("/home/sehe/Projects/stackoverflow/child.exe"),
    bp::child child(bp::exe("./child.exe"),
            io,
            bp::std_in < writePipe,
            bp::std_out > "child.log");

    auto shutdown_sequence = [&] {
        std::this_thread::sleep_for(1s);
        std::clog << "Closing" << std::endl;
        post(strand, [&] { writePipe.close(); });
    };

    std::function<void()> work_loop;

    work_loop = [&, buffer = std::vector<char>(1 << 20)]() mutable {
        size_t length = gen_length();
        std::generate_n(buffer.data(), length, gen_alpha);

        async_write(writePipe, bp::buffer(buffer.data(), length), 
            [&strand, &shutdown_sequence, &work_loop](boost::system::error_code ec, size_t tx) {
                std::clog << "Wrote " << tx << " bytes (" << ec.message() << ")" << std::endl;
                if (ec || (tx == 29)) { // magic length indicates "work done"
                    post(strand, shutdown_sequence);
                } else {
                    post(strand, work_loop);
                }
            });
    };

    // start IO pump
    post(strand, work_loop);
    io.run();

    std::clog << "Bye" << std::endl;
}

运行时,打印类似于

./main.exe
Wrote 13 bytes (Success)
Wrote 11 bytes (Success)
Wrote 26 bytes (Success)
Wrote 32 bytes (Success)
Wrote 17 bytes (Success)
Wrote 24 bytes (Success)
Wrote 28 bytes (Success)
Wrote 29 bytes (Success)
Closing
Bye

同时还写了一个child.log

HEX (32): 71 74 79 6E 77 74 66 63 74 72 66 6D 6D 69 6E 68 73 61 6F 75 68 77 69 77 6B 65 77 6F 76 6D 76 6E
HEX (32): 68 67 72 79 77 7A 74 68 6A 77 65 63 78 64 66 76 6A 61 64 7A 72 6C 74 6E 63 74 6B 71 64 73 7A 70
HEX (32): 70 73 77 79 75 61 70 7A 6D 73 6F 77 68 71 6A 6B 62 6F 77 63 70 63 6D 74 79 70 70 67 6B 64 75 63
HEX (32): 78 6A 79 65 78 68 74 69 75 7A 67 73 67 63 6D 69 73 65 64 63 67 6C 72 75 72 66 76 79 74 75 61 6F
HEX (32): 76 69 75 6D 73 76 6E 79 72 6C 77 6D 69 6F 74 71 6D 76 77 6F 6E 70 73 77 6C 6B 75 68 76 74 71 74
HEX (20): 79 71 77 77 61 75 71 6A 73 68 67 71 72 7A 77 6C 66 67 74 67
于 2020-12-16T00:26:37.313 回答