12

我有以下程序:

package main

import "bytes"
import "io"
import "log"
import "os"
import "os/exec"
import "time"

func main() {
    runCatFromStdinWorks(populateStdin("aaa\n"))
    runCatFromStdinWorks(populateStdin("bbb\n"))
}

func populateStdin(str string) func(io.WriteCloser) {
    return func(stdin io.WriteCloser) {
        defer stdin.Close()
        io.Copy(stdin, bytes.NewBufferString(str))
    }
}

func runCatFromStdinWorks(populate_stdin_func func(io.WriteCloser)) {
    cmd := exec.Command("cat")
    stdin, err := cmd.StdinPipe()
    if err != nil {
        log.Panic(err)
    }
    stdout, err := cmd.StdoutPipe()
    if err != nil {
        log.Panic(err)
    }
    err = cmd.Start()
    if err != nil {
        log.Panic(err)
    }
    go populate_stdin_func(stdin)
    go func() {
            // Removing the following lines allow some output
            // to be fetched from cat's stdout sometimes
            time.Sleep(5 * time.Second)
            io.Copy(os.Stdout, stdout)
    }()
    err = cmd.Wait()
    if err != nil {
        log.Panic(err)
    }
}

在循环中运行时,我没有得到任何结果,如下所示:

$ while true; do go run cat_thingy.go; echo ; done



^C

这个结果是在 Ubuntu 12.04 上从 apt 在虚拟机(go 版本 go1)上安装 golang-go 之后出现的。我无法在 Macbook Air(go 版本 go1.0.3)上的 go 安装上进行复制。这似乎是某种竞赛条件。事实上,如果我设置了一个 sleep(1*time.Second),我永远不会在我的代码中以随机睡眠为代价看到问题。

我在代码中做错了什么,还是这是一个错误?如果是错误,是否已修复?

更新:可能的线索

我发现 Command.Wait 将关闭与 cat 子进程通信的管道,即使它们仍有未读数据。我不太确定处理该问题的正确方法。我想我可以创建一个通道来通知何时完成对标准输入的写入,但我仍然需要知道 cat 进程是否已经结束,以确保不会将任何其他内容写入其标准输出管道。我知道我可以使用 cmd.Process.Wait 来确定进程何时结束,但是然后调用 cmd.Wait 是否安全?

更新:越来越近

这是代码的新剪辑。我相信这对于写入标准输入和从标准输出读取是有效的。我认为如果我从标准输出处理 goroutine 中替换 io.Copy 而没有流的东西,我可以让它正确地流式传输数据(而不是缓冲所有数据)。

package main

import "bytes"
import "fmt"
import "io"
import "log"
import "os/exec"
import "runtime"

const inputBufferBlockLength = 3*64*(2<<10) // enough to be bigger than 2x the pipe buffer of 64KiB
const numInputBlocks = 6

func main() {
    runtime.GOMAXPROCS(5)
    runCatFromStdin(populateStdin(numInputBlocks))
}

func populateStdin(numInputBlocks int) func(io.WriteCloser, chan bool) {
    return func(stdin io.WriteCloser) {
        defer stdin.Close()
        repeatedByteBases := []string{"a", "b", "c", "d", "e", "f"}
        for i := 0; i < numInputBlocks; i++ {
          repeatedBytes := bytes.NewBufferString(repeatedByteBases[i]).Bytes()
          fmt.Printf("%s\n", repeatedBytes)
          io.Copy(stdin, bytes.NewReader(bytes.Repeat(repeatedBytes, inputBufferBlockLength)))
        }
    }
}

func runCatFromStdin(populate_stdin_func func(io.WriteCloser)) {
    cmd := exec.Command("cat")
    stdin, err := cmd.StdinPipe()
    if err != nil {
        log.Panic(err)
    }
    stdout, err := cmd.StdoutPipe()
    if err != nil {
        log.Panic(err)
    }
    err = cmd.Start()
    if err != nil {
        log.Panic(err)
    }
    go populate_stdin_func(stdin)
    output_done_channel := make(chan bool)
    go func() {
        out_bytes := new(bytes.Buffer)
        io.Copy(out_bytes, stdout)
        fmt.Printf("%s\n", out_bytes)
        fmt.Println(out_bytes.Len())
        fmt.Println(inputBufferBlockLength*numInputBlocks)
        output_done_channel <- true
    }()
    <-output_done_channel
    err = cmd.Wait()
    if err != nil {
        log.Panic(err)
    }
}
4

2 回答 2

5

这是您的第一个代码的有效版本。请注意添加了 sync.WaitGroup 以确保在关闭命令之前完成发送和接收 go 例程。

package main

import (
    "bytes"
    "io"
    "log"
    "os"
    "os/exec"
    "sync"
    "time"
)

func main() {
    runCatFromStdinWorks(populateStdin("aaa\n"))
    runCatFromStdinWorks(populateStdin("bbb\n"))
}

func populateStdin(str string) func(io.WriteCloser) {
    return func(stdin io.WriteCloser) {
        defer stdin.Close()
        io.Copy(stdin, bytes.NewBufferString(str))
    }
}

func runCatFromStdinWorks(populate_stdin_func func(io.WriteCloser)) {
    cmd := exec.Command("cat")
    stdin, err := cmd.StdinPipe()
    if err != nil {
        log.Panic(err)
    }
    stdout, err := cmd.StdoutPipe()
    if err != nil {
        log.Panic(err)
    }
    err = cmd.Start()
    if err != nil {
        log.Panic(err)
    }
    var wg sync.WaitGroup
    wg.Add(2)
    go func() {
        defer wg.Done()
        populate_stdin_func(stdin)
    }()
    go func() {
        defer wg.Done()
        time.Sleep(5 * time.Second)
        io.Copy(os.Stdout, stdout)
    }()
    wg.Wait()
    err = cmd.Wait()
    if err != nil {
        log.Panic(err)
    }
}

(这只是表达@peterSO 所说的另一种方式;-)

于 2013-02-11T18:41:57.560 回答
0

Go statements

A "go" statement starts the execution of a function or method call as an independent concurrent thread of control, or goroutine, within the same address space.

GoStmt = "go" Expression .

The expression must be a call. The function value and parameters are evaluated as usual in the calling goroutine, but unlike with a regular call, program execution does not wait for the invoked function to complete. Instead, the function begins executing independently in a new goroutine. When the function terminates, its goroutine also terminates. If the function has any return values, they are discarded when the function completes.

Convert the gratuitous goroutines to function calls.

package main

import (
    "bytes"
    "io"
    "log"
    "os"
    "os/exec"
)

func main() {
    runCatFromStdinWorks(populateStdin("aaa\n"))
    runCatFromStdinWorks(populateStdin("bbb\n"))
}

func populateStdin(str string) func(io.WriteCloser) {
    return func(stdin io.WriteCloser) {
        defer stdin.Close()
        io.Copy(stdin, bytes.NewBufferString(str))
    }
}

func runCatFromStdinWorks(populate_stdin_func func(io.WriteCloser)) {
    cmd := exec.Command("cat")
    stdin, err := cmd.StdinPipe()
    if err != nil {
        log.Panic(err)
    }
    stdout, err := cmd.StdoutPipe()
    if err != nil {
        log.Panic(err)
    }
    err = cmd.Start()
    if err != nil {
        log.Panic(err)
    }
    populate_stdin_func(stdin)
    io.Copy(os.Stdout, stdout)
    err = cmd.Wait()
    if err != nil {
        log.Panic(err)
    }
}
于 2013-02-10T22:49:40.573 回答