3

代码是这样的:

from subprocess import Popen, PIPE


p1 = Popen("command1", stdout = PIPE)
p2 = Popen("command2", stdin = p1.stdout, stdout = PIPE)
result_a = p2.communicate()[0]

p1_again = Popen("command1", stdout = PIPE)
p3 = Popen("command3", stdin = p1_again.stdout, stdout = PIPE)
result_b = p3.communicate()[0]

with open("test") as tf:
    p1_again_again = Popen("command1", stdout = tf)
    p1_again_again.communicate()

不好的部分是:

command1执行了三次,因为当我使用commnnicate一次时,stdoutPopen对象的 不能再次使用。我只是想知道是否有一种方法可以重用PIPE.

有没有人知道如何使这些代码更好(更好的性能以及更少的代码行)?谢谢!

4

2 回答 2

3

这是一个可行的解决方案。我已经为 cmd1、cmd2、cmd3 放置了示例命令,以便您可以运行它。它只是获取第一个命令的输出,并在一个命令中将其大写,在另一个命令中将其小写。

代码

from subprocess import Popen, PIPE, check_output
from tempfile import TemporaryFile

cmd1 = ['echo', 'Hi']
cmd2 = ['tr', '[:lower:]', '[:upper:]']
cmd3 = ['tr', '[:upper:]', '[:lower:]']

with TemporaryFile() as f:
    p = Popen(cmd1, stdout=f)
    ret_code = p.wait()
    f.flush()
    f.seek(0)
    out2 = Popen(cmd2, stdin=f, stdout=PIPE).stdout.read()
    f.seek(0)
    out3 = Popen(cmd3, stdin=f, stdout=PIPE).stdout.read()
    print out2, out3

输出

HI
hi

解决方案中需要注意的一些事项。当需要处理临时文件时,临时文件模块总是一个很好的方法,一旦 with 语句退出,它会自动删除临时文件作为清理,即使通过 with 块抛出了一些 io 异常。cmd1 运行一次并输出到临时文件,调用 wait() 方法以确保所有执行都已完成,然后我们每次都执行 seek(0) 以便当我们在 f 上调用 read() 方法时它会返回在文件的开头。作为参考,Saving stdout from subprocess.Popen to file问题帮助我获得了解决方案的第一部分。

于 2012-11-22T03:43:23.703 回答
0

如果你可以读取command1内存中的所有输出然后运行command2command3一个接一个:

#!/usr/bin/env python
from subprocess import Popen, PIPE, check_output as qx

cmd1_output = qx(['ls']) # get all output

# run commands in sequence
results = [Popen(cmd, stdin=PIPE, stdout=PIPE).communicate(cmd1_output)[0]
           for cmd in [['cat'], ['tr', 'a-z', 'A-Z']]]

或者,如果像@Marwan Alsabbagh 建议command1的那样生成无法放入内存的巨大输出,您可以先写入临时文件:

#!/usr/bin/env python
import tempfile
from subprocess import check_call, check_output as qx

with tempfile.TemporaryFile() as file: # deleted automatically on closing
    # run command1, wait for completion
    check_call(['ls'], stdout=file)

    # run commands in sequence
    results = []
    for cmd in [['cat'], ['tr', 'a-z', 'A-Z']]:
        file.seek(0)
        results.append(qx(cmd, stdin=file))

要并行处理子进程的输入/输出,您可以使用threading

#!/usr/bin/env python3
from contextlib import ExitStack  # pip install contextlib2 (stdlib since 3.3)
from subprocess import Popen, PIPE
from threading  import Thread

def tee(fin, *files):
    try:
        for chunk in iter(lambda: fin.read(1 << 10), b''):
            for f in files:  # fan out
                f.write(chunk)
    finally:
        for f in (fin,) + files:
            try:
                f.close()
            except OSError:
                pass

with ExitStack() as stack:
    # run commands asynchronously
    source_proc = Popen(["command1", "arg1"], stdout=PIPE)
    stack.callback(source_proc.wait)
    stack.callback(source_proc.stdout.close)

    processes = []
    for command in [["tr", "a-z", "A-Z"], ["cat"]]:
        processes.append(Popen(command, stdin=PIPE, stdout=PIPE))
        stack.callback(processes[-1].wait)
        stack.callback(processes[-1].stdout.close) # use .terminate()
        stack.callback(processes[-1].stdin.close)  # if it doesn't kill it

    fout = open("test.txt", "wb")
    stack.callback(fout.close)

    # fan out source_proc's output
    Thread(target=tee, args=([source_proc.stdout, fout] +
                             [p.stdin for p in processes])).start()

    # collect results in parallel
    results = [[] for _ in range(len(processes))]
    threads = [Thread(target=r.extend, args=[iter(p.stdout.readline, b'')])
               for p, r in zip(processes, results)]
    for t in threads: t.start()
    for t in threads: t.join() # wait for completion

我在ExitStack这里使用了适当的清理,以防出现异常。

于 2012-11-22T11:49:49.663 回答