我正在使用 python 测试子流程管道。我知道我可以直接在 python 中执行下面的程序,但这不是重点。我只想测试管道,所以我知道如何使用它。
我的系统是 Linux Ubuntu 9.04,默认 python 2.6。
我从这个文档示例开始。
from subprocess import Popen, PIPE
p1 = Popen(["grep", "-v", "not"], stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
output = p2.communicate()[0]
print output
这行得通,但由于p1
'sstdin
没有被重定向,我必须在终端中输入内容来馈送管道。当我输入^D
关闭标准输入时,我得到了我想要的输出。
但是,我想使用 python 字符串变量将数据发送到管道。首先我尝试在标准输入上写:
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.stdin.write('test\n')
output = p2.communicate()[0] # blocks forever here
没用。我尝试p2.stdout.read()
在最后一行使用,但它也会阻塞。我添加了p1.stdin.flush()
,p1.stdin.close()
但它也没有工作。我然后我开始交流:
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.communicate('test\n') # blocks forever here
output = p2.communicate()[0]
所以还是不是这样。
我注意到运行单个进程(如p1
上面,删除p2
)可以完美运行。并且将文件句柄传递给p1
( stdin=open(...)
) 也可以。所以问题是:
是否可以在不阻塞的情况下将数据传递到 python 中的 2 个或多个子进程的管道?为什么不?
我知道我可以运行一个 shell 并在 shell 中运行管道,但这不是我想要的。
更新 1:按照下面 Aaron Digulla 的提示,我现在正在尝试使用线程来使其工作。
首先,我尝试在线程上运行 p1.communicate。
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=p1.communicate, args=('some data\n',))
t.start()
output = p2.communicate()[0] # blocks forever here
好吧,没用。尝试了其他组合,例如将其更改为.write()
and p2.read()
。没有什么。现在让我们尝试相反的方法:
def get_output(subp):
output = subp.communicate()[0] # blocks on thread
print 'GOT:', output
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=get_output, args=(p2,))
t.start()
p1.communicate('data\n') # blocks here.
t.join()
代码最终在某处阻塞。在衍生线程中,或在主线程中,或两者兼而有之。所以它没有用。如果你知道如何让它工作,如果你能提供工作代码会更容易。我在这里试试。
更新 2
Paul Du Bois 在下面回答了一些信息,所以我做了更多的测试。我已经阅读了整个subprocess.py
模块并了解了它的工作原理。因此,我尝试将其完全应用于代码。
我在 linux 上,但由于我正在使用线程进行测试,所以我的第一种方法是复制在subprocess.py
'scommunicate()
方法上看到的确切 Windows 线程代码,但是对于两个进程而不是一个进程。这是我尝试过的完整列表:
import os
from subprocess import Popen, PIPE
import threading
def get_output(fobj, buffer):
while True:
chunk = fobj.read() # BLOCKS HERE
if not chunk:
break
buffer.append(chunk)
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
b = [] # create a buffer
t = threading.Thread(target=get_output, args=(p2.stdout, b))
t.start() # start reading thread
for x in xrange(100000):
p1.stdin.write('hello world\n') # write data
p1.stdin.flush()
p1.stdin.close() # close input...
t.join()
出色地。它没有用。即使在p1.stdin.close()
被调用之后,p2.stdout.read()
仍然会阻塞。
然后我尝试了posix代码subprocess.py
:
import os
from subprocess import Popen, PIPE
import select
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
numwrites = 100000
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer
while to_read or to_write:
read_now, write_now, xlist = select.select(to_read, to_write, [])
if read_now:
data = os.read(p2.stdout.fileno(), 1024)
if not data:
p2.stdout.close()
to_read = []
else:
b.append(data)
if write_now:
if numwrites > 0:
numwrites -= 1
p1.stdin.write('hello world!\n'); p1.stdin.flush()
else:
p1.stdin.close()
to_write = []
print b
也阻止select.select()
. 通过传播print
s,我发现了这一点:
- 阅读是有效的。代码在执行过程中多次读取。
- 写作也在工作。数据写入
p1.stdin
. - 结束时
numwrites
,p1.stdin.close()
被调用。 - 当
select()
开始阻塞时,只有to_read
一些东西,p2.stdout
。to_write
已经是空的了。 os.read()
call 总是返回一些东西,所以p2.stdout.close()
永远不会被调用。
两个测试的结论:关闭stdin
管道上的第一个进程(grep
在示例中)不会使其将其缓冲输出转储到下一个进程并死掉。
没有办法让它工作?
PS:我不想使用临时文件,我已经用文件进行了测试,我知道它可以工作。而且我不想使用窗户。