0

我正在使用 golang kubernetes 客户端创建 kubernetes pod 并在其中执行远程命令。但是,我发现在远程执行完成之前我无法获得有关远程执行状态的反馈,因为我无法弄清楚如何流式传输远程命令的日志。这是我当前执行远程命令的实现:

func (k *KubernetesClient) RunCommand(ctx context.Context, args *RunCommandArgs) (string, int, error) {
    req := k.clientset.CoreV1().RESTClient().Post().Resource("pods").Name(args.ContainerId).Namespace(k.namespace).SubResource("exec").Param("container", CONTAINER_NAME)
    scheme := runtime.NewScheme()
    if err := v1.AddToScheme(scheme); err != nil {
        return "", 0, fmt.Errorf("could not add to scheme: %w", err)
    }
    parameterCodec := runtime.NewParameterCodec(scheme)
    req.VersionedParams(&v1.PodExecOptions{
        Stdin:     false,
        Stdout:    true,
        Stderr:    true,
        TTY:       false,
        Container: args.ContainerId,
        Command:   []string{"sh", "-c", args.Command},
    }, parameterCodec)
    exec, err := remotecommand.NewSPDYExecutor(k.config, "POST", req.URL())
    if err != nil {
        return "", 0, fmt.Errorf("could not exec command: %w", err)
    }
    var stdout, stderr bytes.Buffer
    var streamErr error
    streamErr = exec.Stream(remotecommand.StreamOptions{
        Stdin:  nil,
        Stdout: &stdout,
        Stderr: &stderr,
        Tty:    false,
    })

    if streamErr != nil {
        if strings.Contains(streamErr.Error(), "command terminated with exit code") {
            return stderr.String(), 1, nil
        } else {
            return "", 0, fmt.Errorf("could not stream results: %w", streamErr)
        }
    }
    return stdout.String(), 0, nil
}

在这个实现中,我不知道远程命令的状态,直到它完成执行,此时我一次获得所有输出日志。

有没有办法在通过调用来读取stdout/stderr时读取它们exec.Stream?在理想的世界中,我希望能够逐行打印远程命令行的输出。我注意到bytes.Buffer有一个ReadString接受分隔符的方法。这看起来像是一个有用的方法,但我无法弄清楚如何使用它。

4

1 回答 1

0

这只是部分答案,但如果我设置使用以下内容PodExecOptionsStreamOptions然后我会看到每个日志行都实时打印(请注意,我使用的是标准输入和标准输出,而不是自定义缓冲区)Ttytrue

v1.PodExecOptions{
        Stdin:     true,
        Stdout:    true,
        Stderr:    false,
        TTY:       true,
        Container: args.ContainerId,
        Command:   []string{"sh", "-c", args.Command},
    }

remotecommand.StreamOptions{
            Stdin:  os.Stdin,
            Stdout: os.Stdout,
            Stderr: nil,
            Tty:    true,
        }

但是,如果我尝试使用其他东西os.Stdinos.Stdout那么我永远不会得到任何日志行。例如,以下用法不会打印任何内容:

    var stdout, stdin bytes.Buffer
    var streamErr error

    go func() {
        streamErr = exec.Stream(remotecommand.StreamOptions{
            Stdin:  &stdin,
            Stdout: &stdout,
            Stderr: nil,
            Tty:    true,
        })
    }()
    time.Sleep(5*time.Second)
    log.Info("doing raw string calls on both buffers")
    log.Info(stdin.String())
    log.Info(stdout.String())
    log.Info("starting scan of stdin")
    scanner := bufio.NewScanner(&stdin)
    scanner.Split(bufio.ScanLines)
    for scanner.Scan() {
        m := scanner.Text()
        fmt.Println(m)
    }
    log.Info("starting scan of stdout")
    scanner = bufio.NewScanner(&stdout)
    scanner.Split(bufio.ScanLines)
    for scanner.Scan() {
        m := scanner.Text()
        fmt.Println(m)
    }
    log.Info("finished scanning of stdout")

我仍在尝试弄清楚如何使用自定义缓冲区,以便我可以管理写入日志的内容,而不是直接通过管道传输到标准输出(我想将一些自定义字段附加到记录的每一行)。


编辑:好吧,我想出了一个可行的解决方案。这是完整的代码

type LogStreamer struct{
    b bytes.Buffer
}

func (l *LogStreamer) String() string {
    return l.b.String()
}

func (l *LogStreamer) Write(p []byte) (n int, err error) {
    a := strings.TrimSpace(string(p))
    l.b.WriteString(a)
    log.Info(a)
    return len(p), nil
}

func (k *KubernetesClient) RunCommand(ctx context.Context, args *RunCommandArgs) (string, int, error) {
    req := k.clientset.CoreV1().RESTClient().Post().Resource("pods").Name(args.ContainerId).Namespace(k.namespace).SubResource("exec").Param("container", "worker")
    scheme := runtime.NewScheme()
    if err := v1.AddToScheme(scheme); err != nil {
        return "", 0, fmt.Errorf("could not add to scheme: %w", err)
    }
    parameterCodec := runtime.NewParameterCodec(scheme)
    req.VersionedParams(&v1.PodExecOptions{
        Stdin:     true,
        Stdout:    true,
        Stderr:    false,
        TTY:       true,
        Container: args.ContainerId,
        Command:   []string{"sh", "-c", args.Command},
    }, parameterCodec)
    exec, err := remotecommand.NewSPDYExecutor(k.config, "POST", req.URL())
    if err != nil {
        return "", 0, fmt.Errorf("could not exec command: %w", err)
    }
    var streamErr error
    l := &LogStreamer{}

    streamErr = exec.Stream(remotecommand.StreamOptions{
        Stdin:  os.Stdin,
        Stdout: l,
        Stderr: nil,
        Tty:    true,
    })

    if streamErr != nil {
        if strings.Contains(streamErr.Error(), "command terminated with exit code") {
            return l.String(), 1, nil
        } else {
            return "", 0, fmt.Errorf("could not stream results: %w", streamErr)
        }
    }
    return l.String(), 0, nil
}

我创建了一个实现io.Writer接口的结构并在StreamOptions结构中使用它。另请注意,我必须os.Stdin在struct中使用,StreamOptions否则只有一行会被流回用于Stdout.

另请注意,我必须修剪传递给的缓冲区,LogStreamer.Write因为回车或换行符似乎会导致 logrus 包出现问题。这个解决方案还有更多的改进,但它肯定朝着正确的方向前进。

于 2021-06-25T20:23:27.870 回答