如果你可以读取command1
内存中的所有输出然后运行command2
,command3
一个接一个:
#!/usr/bin/env python
from subprocess import Popen, PIPE, check_output as qx
cmd1_output = qx(['ls']) # get all output
# run commands in sequence
results = [Popen(cmd, stdin=PIPE, stdout=PIPE).communicate(cmd1_output)[0]
for cmd in [['cat'], ['tr', 'a-z', 'A-Z']]]
或者,如果像@Marwan Alsabbagh 建议command1
的那样生成无法放入内存的巨大输出,您可以先写入临时文件:
#!/usr/bin/env python
import tempfile
from subprocess import check_call, check_output as qx
with tempfile.TemporaryFile() as file: # deleted automatically on closing
# run command1, wait for completion
check_call(['ls'], stdout=file)
# run commands in sequence
results = []
for cmd in [['cat'], ['tr', 'a-z', 'A-Z']]:
file.seek(0)
results.append(qx(cmd, stdin=file))
要并行处理子进程的输入/输出,您可以使用threading
:
#!/usr/bin/env python3
from contextlib import ExitStack # pip install contextlib2 (stdlib since 3.3)
from subprocess import Popen, PIPE
from threading import Thread
def tee(fin, *files):
try:
for chunk in iter(lambda: fin.read(1 << 10), b''):
for f in files: # fan out
f.write(chunk)
finally:
for f in (fin,) + files:
try:
f.close()
except OSError:
pass
with ExitStack() as stack:
# run commands asynchronously
source_proc = Popen(["command1", "arg1"], stdout=PIPE)
stack.callback(source_proc.wait)
stack.callback(source_proc.stdout.close)
processes = []
for command in [["tr", "a-z", "A-Z"], ["cat"]]:
processes.append(Popen(command, stdin=PIPE, stdout=PIPE))
stack.callback(processes[-1].wait)
stack.callback(processes[-1].stdout.close) # use .terminate()
stack.callback(processes[-1].stdin.close) # if it doesn't kill it
fout = open("test.txt", "wb")
stack.callback(fout.close)
# fan out source_proc's output
Thread(target=tee, args=([source_proc.stdout, fout] +
[p.stdin for p in processes])).start()
# collect results in parallel
results = [[] for _ in range(len(processes))]
threads = [Thread(target=r.extend, args=[iter(p.stdout.readline, b'')])
for p, r in zip(processes, results)]
for t in threads: t.start()
for t in threads: t.join() # wait for completion
我在ExitStack
这里使用了适当的清理,以防出现异常。