如果你使用bp::std_out > some_kind_of_buffer_or_future
你通常只会在退出时得到结果。
但是,您可以使用async_pipe
:
bp::async_pipe pipe(io);
bp::child c( //
"/bin/bash",
std::vector<std::string>{
"-c",
"for a in {1..20}; do sleep 1; echo message $a; done",
}, //
bp::std_out > pipe, //
bp::on_exit(on_exit), //
io);
现在,您必须在该管道上显式执行 IO:
boost::asio::streambuf sb;
async_read_until( //
pipe, sb, "message 5\n", //
[&](error_code ec, size_t) { //
std::cout << "Got message 5 (" << ec.message() << ")" << std::endl;
});
这有效:
住在科利鲁
#include <boost/process.hpp>
#include <boost/process/async.hpp>
#include <boost/asio.hpp>
#include <iostream>
namespace bp = boost::process;
using boost::system::error_code;
namespace /*file-static*/ {
using namespace std::chrono_literals;
static auto now = std::chrono::steady_clock::now;
static const auto t0 = now();
static auto timestamp() {
return std::to_string((now() - t0) / 1.s) + "s ";
}
} // namespace
int main() {
boost::asio::io_context io;
bp::async_pipe pipe(io);
auto on_exit = [](int code, std::error_code ec) {
std::cout << timestamp() << "on_exit: " << ec.message() << " code "
<< code << std::endl;
};
bp::child c( //
"/bin/bash",
std::vector<std::string>{
"-c",
"for a in {1..20}; do sleep 1; echo message $a; done",
}, //
bp::std_out > pipe, //
bp::on_exit(on_exit), //
io);
boost::asio::streambuf sb;
async_read_until( //
pipe, sb, "message 5\n", //
[&](error_code ec, size_t) { //
std::cout << timestamp() << "Got message 5 (" << ec.message() << ")"
<< std::endl;
});
io.run();
}
印刷
5.025400s Got message 5 (Success)
20.100547s on_exit: Success code 0
因此,您可以在您正在寻找的内容发生时对其做出响应。请记住,操作系统和 shell 在管道上进行流缓冲,但默认是行缓冲,因此,您可以期望在打印换行符后立即接收输入。
大缓冲区?
上面有点假设您可以将整个输出缓冲到有趣的消息。如果那是千兆字节呢?只要您的模式不是千兆字节,您就可以继续阅读,直到条件匹配。
让我们把我们的例子变成一个异步 grep,class\s*\w+_heap
在所有的 boost 头文件中查找正则表达式。当然,这是许多兆字节的数据,但我们只使用了 10Kb 的缓冲区:
std::string text;
auto buf = boost::asio::dynamic_buffer(text, 10 * 1024); // max 10 kilobyte
size_t total_received =0;
boost::regex const re(R"(class\s*\w+_heap)");
现在我们创建一个读取循环,读取直到匹配或缓冲区已满:
std::function<void()> wait_for_message;
wait_for_message = [&] {
async_read_until( //
pipe, buf, re, //
[&](error_code ec, size_t received) { //
std::cerr << '\x0d' << timestamp() << "Checking for message ("
<< ec.message() << ", total " << total_received
<< ") ";
if (received || ec != boost::asio::error::not_found) {
total_received += received;
buf.consume(received);
boost::smatch m;
if (regex_search(text, m, re)) {
std::cout << "\n" << timestamp()
<< "Found: " << std::quoted(m.str()) << " at "
<< (total_received - m.size()) << " bytes"
<< std::endl;
}
} else {
// discard 90% of buffer capacity
auto discard =
std::min(buf.max_size() / 10 * 9, buf.size());
total_received += discard;
buf.consume(discard);
}
if (!ec | (ec == boost::asio::error::not_found))
wait_for_message();
else
std::cout << "\n" << timestamp() << ec.message() << std::endl;
});
};
当然,如果匹配超过缓冲区大小的 10%,这个系统可能会丢失匹配(因为我们只保留之前缓冲区内容的 10% 以允许匹配重叠读取边界)。
再次,在 Coliru 上看到它
#include <boost/process.hpp>
#include <boost/process/async.hpp>
#include <boost/asio.hpp>
#include <boost/regex.hpp>
#include <iostream>
#include <iomanip>
namespace bp = boost::process;
using boost::system::error_code;
namespace /*file-static*/ {
using namespace std::chrono_literals;
static auto now = std::chrono::steady_clock::now;
static const auto t0 = now();
static auto timestamp() {
return std::to_string((now() - t0) / 1.s) + "s ";
}
} // namespace
int main() {
boost::asio::io_context io;
bp::async_pipe pipe(io);
auto on_exit = [](int code, std::error_code ec) {
std::cout << timestamp() << "on_exit: " << ec.message() << " code "
<< code << std::endl;
};
bp::child c( //
"/usr/bin/find",
std::vector<std::string>{"/usr/local/include/boost", "-name",
"*.hpp", "-exec", "cat", "{}", "+"},
bp::std_out > pipe, //
bp::on_exit(on_exit), //
io);
std::string text;
auto buf = boost::asio::dynamic_buffer(text, 10 * 1024); // max 10 kilobyte
size_t total_received =0;
boost::regex const re(R"(class\s*\w+_heap)");
std::function<void()> wait_for_message;
wait_for_message = [&] {
async_read_until( //
pipe, buf, re, //
[&](error_code ec, size_t received) { //
std::cerr << '\x0d' << timestamp() << "Checking for message ("
<< ec.message() << ", total " << total_received
<< ") ";
if (received || ec != boost::asio::error::not_found) {
total_received += received;
buf.consume(received);
boost::smatch m;
if (regex_search(text, m, re)) {
std::cout << "\n" << timestamp()
<< "Found: " << std::quoted(m.str()) << " at "
<< (total_received - m.size()) << " bytes"
<< std::endl;
}
} else {
// discard 90% of buffer capacity
auto discard =
std::min(buf.max_size() / 10 * 9, buf.size());
total_received += discard;
buf.consume(discard);
}
if (!ec | (ec == boost::asio::error::not_found))
wait_for_message();
else
std::cout << "\n" << timestamp() << ec.message() << std::endl;
});
};
wait_for_message();
io.run();
std::cout << timestamp() << " - Done, total_received: " << total_received << "\n";
}
哪个打印
2.033324s Found: "class d_ary_heap" at 6747512 bytes
2.065290s Found: "class pairing_heap" at 6831390 bytes
2.071888s Found: "class binomial_heap" at 6860833 bytes
2.072715s Found: "class skew_heap" at 6895677 bytes
2.073348s Found: "class fibonacci_heap" at 6921559 bytes
34.729355s End of file
34.730515s on_exit: Success code 0
34.730593s - Done, total_received: 154746011
或者住在我的机器上: