我想实现一个futures::Stream
用于读取和解析子子进程的标准输出的方法。
我现在在做什么:
生成子进程并通过
std::process
方法获取其标准输出:let child = Command::new(...).stdout(Stdio.pipe()).spawn().expect(...)
添加
AsyncRead
和BufRead
到标准输出:let stdout = BufReader::new(tokio_io::io::AllowStdIo::new( child.stdout.expect("Failed to open stdout"), ));
为标准输出声明一个包装结构:
struct MyStream<Io: AsyncRead + BufRead> { io: Io, }
实施
Stream
:impl<Io: AsyncRead + BufRead> Stream for MyStream<Io> { type Item = Message; type Error = Error; fn poll(&mut self) -> Poll<Option<Message>, Error> { let mut line = String::new(); let n = try_nb!(self.io.read_line(&mut line)); if n == 0 { return Ok(None.into()); } //...read & parse further } }
问题是它AllowStdIo
不会ChildStdout
神奇地异步并且self.io.read_line
调用仍然阻塞。
我想我需要传递一些不同的东西而不是Stdio::pipe()
让它异步,但是什么?或者有不同的解决方案吗?
这个问题不同于What is the best approach to 在 future-rs 中封装阻塞 I/O?因为我想为子进程的特定情况获取异步I/O,而不是解决同步I/O的封装问题。
更新:我正在使用tokio = "0.1.3"
它的运行时功能,目前使用tokio-process
不是一个选项(https://github.com/alexcrichton/tokio-process/issues/27)