3

我正在从 python 编译一个要在 unix 系统上执行的命令,其中包含多个管道步骤。例如:

grep foo myfile | cmd1 | cmd2 > output

foo这些对应于对其中包含 from的条目进行的一系列转换myfile。我有时将它构建为由模块执行的命令os.system,在其他情况下使用subprocess模块执行。

如何从 Python 对管道的每个部分进行错误检查?例如,在 99% 的情况下,有foo入口myfile和剩余的管道工程。但是如果由于某种原因myfile存在但不包含foo条目,那么管道的其余部分会中断,因为您正在管道传输空文件,并且其余命令需要非空输入才能工作。这使得调试变得困难,因为您所看到的只是损坏管道的输出,不知道哪个中间步骤失败了。

有没有办法在 Python 中构造管道来检查中间步骤的错误?例如检查grep管道的部分是否真的包含一些输出,cmd1输入上的输出是否grep也包含一些输出,等等?谢谢。

4

3 回答 3

1

这是我的方法,在 Python 2.6 Linux 上进行了测试。

a.定义一个方法

def run_command(command):
    p = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    out, err = p.communicate()
    return (p.returncode, out, err)

b.按以下方式使用

def test_run_command_pipestatus(self):
    # the exit $PIPESTATUS is the return code of "exit 2". The default return code is the return code of last command in the pipe
    return_code, out, err = run_command("exit 2 | tail -n 1; exit $PIPESTATUS")
    print "return_code = %d" %return_code
    print out
    print err
    self.assertEqual(2, return_code)
于 2014-01-08T09:19:33.150 回答
0

这是我的方法,无论 mutch 管道如何命令。它是所有状态代码的总和,并以“echo 1 |echo 2”或“cat test |echo 2”的结果退出并由您的命令替换

>>getstatusoutput('status=0;echo 1 |echo 2;for ((i=0;i<${#PIPESTATUS[*]};i++));让 status=${status}+${PIPESTATUS[ ${i}]};完成;退出 $status')

(0, '2')

>> getstatusoutput('status=0;cat test |echo 2;for ((i=0;i<${#PIPESTATUS[*]};i++));do let status=${status}+${PIPESTATUS[ ${i}]};完成;退出 $status')

(1, '2\ncat: test: 没有这样的文件或目录')

于 2020-11-20T06:35:38.287 回答
0

与 Mingjiang Shi 的答案基本相同,其中几乎包含您需要的所有关键信息,但没有额外的功能,并且修复了问题。(并在 python 2.7.12 和 3.5.2 中测试)

$PIPESTATUS 在 sh 中不存在,因此如果您的发行版没有/bin/sh类似的行为bash(例如使用 的 Ubuntu dash),则必须将可执行文件设置为 bash。它是一个数组,所以我更喜欢像一个数组一样使用它。

而且我建议使用wait()而不是communicate(),因此您可以过滤而不是 slurp (在读入小缓冲区时使用,而不是在处理任何内容之前将整个输出加载到内存中)。

所以你需要做的就是:

  • 应该使用字符串,而不是参数列表
  • 使用壳=真
  • 使用可执行文件="/bin/bash"

代码:

p = subprocess.Popen("false | true; exit ${PIPESTATUS[0]}", shell=True, executable='/bin/bash', stdout=subprocess.PIPE, stderr=subprocess.PIPE)

# at this point you do your filtering, eg. using p.stdout.readinto(buf)

p.wait()
if p.returncode != 0:
    # handle it here
于 2016-11-26T20:06:12.510 回答