我有以下程序:
package main
import "bytes"
import "io"
import "log"
import "os"
import "os/exec"
import "time"
func main() {
runCatFromStdinWorks(populateStdin("aaa\n"))
runCatFromStdinWorks(populateStdin("bbb\n"))
}
func populateStdin(str string) func(io.WriteCloser) {
return func(stdin io.WriteCloser) {
defer stdin.Close()
io.Copy(stdin, bytes.NewBufferString(str))
}
}
func runCatFromStdinWorks(populate_stdin_func func(io.WriteCloser)) {
cmd := exec.Command("cat")
stdin, err := cmd.StdinPipe()
if err != nil {
log.Panic(err)
}
stdout, err := cmd.StdoutPipe()
if err != nil {
log.Panic(err)
}
err = cmd.Start()
if err != nil {
log.Panic(err)
}
go populate_stdin_func(stdin)
go func() {
// Removing the following lines allow some output
// to be fetched from cat's stdout sometimes
time.Sleep(5 * time.Second)
io.Copy(os.Stdout, stdout)
}()
err = cmd.Wait()
if err != nil {
log.Panic(err)
}
}
在循环中运行时,我没有得到任何结果,如下所示:
$ while true; do go run cat_thingy.go; echo ; done
^C
这个结果是在 Ubuntu 12.04 上从 apt 在虚拟机(go 版本 go1)上安装 golang-go 之后出现的。我无法在 Macbook Air(go 版本 go1.0.3)上的 go 安装上进行复制。这似乎是某种竞赛条件。事实上,如果我设置了一个 sleep(1*time.Second),我永远不会在我的代码中以随机睡眠为代价看到问题。
我在代码中做错了什么,还是这是一个错误?如果是错误,是否已修复?
更新:可能的线索
我发现 Command.Wait 将关闭与 cat 子进程通信的管道,即使它们仍有未读数据。我不太确定处理该问题的正确方法。我想我可以创建一个通道来通知何时完成对标准输入的写入,但我仍然需要知道 cat 进程是否已经结束,以确保不会将任何其他内容写入其标准输出管道。我知道我可以使用 cmd.Process.Wait 来确定进程何时结束,但是然后调用 cmd.Wait 是否安全?
更新:越来越近
这是代码的新剪辑。我相信这对于写入标准输入和从标准输出读取是有效的。我认为如果我从标准输出处理 goroutine 中替换 io.Copy 而没有流的东西,我可以让它正确地流式传输数据(而不是缓冲所有数据)。
package main
import "bytes"
import "fmt"
import "io"
import "log"
import "os/exec"
import "runtime"
const inputBufferBlockLength = 3*64*(2<<10) // enough to be bigger than 2x the pipe buffer of 64KiB
const numInputBlocks = 6
func main() {
runtime.GOMAXPROCS(5)
runCatFromStdin(populateStdin(numInputBlocks))
}
func populateStdin(numInputBlocks int) func(io.WriteCloser, chan bool) {
return func(stdin io.WriteCloser) {
defer stdin.Close()
repeatedByteBases := []string{"a", "b", "c", "d", "e", "f"}
for i := 0; i < numInputBlocks; i++ {
repeatedBytes := bytes.NewBufferString(repeatedByteBases[i]).Bytes()
fmt.Printf("%s\n", repeatedBytes)
io.Copy(stdin, bytes.NewReader(bytes.Repeat(repeatedBytes, inputBufferBlockLength)))
}
}
}
func runCatFromStdin(populate_stdin_func func(io.WriteCloser)) {
cmd := exec.Command("cat")
stdin, err := cmd.StdinPipe()
if err != nil {
log.Panic(err)
}
stdout, err := cmd.StdoutPipe()
if err != nil {
log.Panic(err)
}
err = cmd.Start()
if err != nil {
log.Panic(err)
}
go populate_stdin_func(stdin)
output_done_channel := make(chan bool)
go func() {
out_bytes := new(bytes.Buffer)
io.Copy(out_bytes, stdout)
fmt.Printf("%s\n", out_bytes)
fmt.Println(out_bytes.Len())
fmt.Println(inputBufferBlockLength*numInputBlocks)
output_done_channel <- true
}()
<-output_done_channel
err = cmd.Wait()
if err != nil {
log.Panic(err)
}
}