3

我想实现一个futures::Stream用于读取和解析子子进程的标准输出的方法。

我现在在做什么:

  • 生成子进程并通过std::process方法获取其标准输出:let child = Command::new(...).stdout(Stdio.pipe()).spawn().expect(...)

  • 添加AsyncReadBufRead到标准输出:

    let stdout = BufReader::new(tokio_io::io::AllowStdIo::new(
        child.stdout.expect("Failed to open stdout"),
    ));
    
  • 为标准输出声明一个包装结构:

    struct MyStream<Io: AsyncRead + BufRead> {
        io: Io,
    }
    
  • 实施Stream

    impl<Io: AsyncRead + BufRead> Stream for MyStream<Io> {
        type Item = Message;
        type Error = Error;
    
        fn poll(&mut self) -> Poll<Option<Message>, Error> {
            let mut line = String::new();
            let n = try_nb!(self.io.read_line(&mut line));
            if n == 0 {
                return Ok(None.into());
            }
            //...read & parse further
        }
    }
    

问题是它AllowStdIo不会ChildStdout神奇地异步并且self.io.read_line调用仍然阻塞。

我想我需要传递一些不同的东西而不是Stdio::pipe()让它异步,但是什么?或者有不同的解决方案吗?

这个问题不同于What is the best approach to 在 future-rs 中封装阻塞 I/O?因为我想为子进程的特定情况获取异步I/O,而不是解决同步I/O的封装问题。

更新:我正在使用tokio = "0.1.3"它的运行时功能,目前使用tokio-process不是一个选项(https://github.com/alexcrichton/tokio-process/issues/27

4

1 回答 1

3

tokio-processcrate 为您提供了一个特性CommandExt,允许您异步生成命令。

结果Child有一个 getterChildStdout实现Read并且是非阻塞的。

像你在你的例子中所做的那样包裹tokio_process::ChildStdout起来AllowStdIo应该让它工作!

于 2018-03-12T23:45:32.923 回答