34

我正在使用 python 测试子流程管道。我知道我可以直接在 python 中执行下面的程序,但这不是重点。我只想测试管道,所以我知道如何使用它。

我的系统是 Linux Ubuntu 9.04,默认 python 2.6。

我从这个文档示例开始。

from subprocess import Popen, PIPE
p1 = Popen(["grep", "-v", "not"], stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
output = p2.communicate()[0]
print output

这行得通,但由于p1'sstdin没有被重定向,我必须在终端中输入内容来馈送管道。当我输入^D关闭标准输入时,我得到了我想要的输出。

但是,我想使用 python 字符串变量将数据发送到管道。首先我尝试在标准输入上写:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.stdin.write('test\n')
output = p2.communicate()[0] # blocks forever here

没用。我尝试p2.stdout.read()在最后一行使用,但它也会阻塞。我添加了p1.stdin.flush()p1.stdin.close()但它也没有工作。我然后我开始交流:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.communicate('test\n') # blocks forever here
output = p2.communicate()[0] 

所以还是不是这样。

我注意到运行单个进程(如p1上面,删除p2)可以完美运行。并且将文件句柄传递给p1( stdin=open(...)) 也可以。所以问题是:

是否可以在不阻塞的情况下将数据传递到 python 中的 2 个或多个子进程的管道?为什么不?

我知道我可以运行一个 shell 并在 shell 中运行管道,但这不是我想要的。


更新 1:按照下面 Aaron Digulla 的提示,我现在正在尝试使用线程来使其工作。

首先,我尝试在线程上运行 p1.communicate。

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=p1.communicate, args=('some data\n',))
t.start()
output = p2.communicate()[0] # blocks forever here

好吧,没用。尝试了其他组合,例如将其更改为.write()and p2.read()。没有什么。现在让我们尝试相反的方法:

def get_output(subp):
    output = subp.communicate()[0] # blocks on thread
    print 'GOT:', output

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=get_output, args=(p2,)) 
t.start()
p1.communicate('data\n') # blocks here.
t.join()

代码最终在某处阻塞。在衍生线程中,或在主线程中,或两者兼而有之。所以它没有用。如果你知道如何让它工作,如果你能提供工作代码会更容易。我在这里试试。


更新 2

Paul Du Bois 在下面回答了一些信息,所以我做了更多的测试。我已经阅读了整个subprocess.py模块并了解了它的工作原理。因此,我尝试将其完全应用于代码。

我在 linux 上,但由于我正在使用线程进行测试,所以我的第一种方法是复制在subprocess.py'scommunicate()方法上看到的确切 Windows 线程代码,但是对于两个进程而不是一个进程。这是我尝试过的完整列表:

import os
from subprocess import Popen, PIPE
import threading

def get_output(fobj, buffer):
    while True:
        chunk = fobj.read() # BLOCKS HERE
        if not chunk:
            break
        buffer.append(chunk)

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)

b = [] # create a buffer
t = threading.Thread(target=get_output, args=(p2.stdout, b))
t.start() # start reading thread

for x in xrange(100000):
    p1.stdin.write('hello world\n') # write data
    p1.stdin.flush()
p1.stdin.close() # close input...
t.join()

出色地。它没有用。即使在p1.stdin.close()被调用之后,p2.stdout.read()仍然会阻塞。

然后我尝试了posix代码subprocess.py

import os
from subprocess import Popen, PIPE
import select

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)

numwrites = 100000
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer

while to_read or to_write:
    read_now, write_now, xlist = select.select(to_read, to_write, [])
    if read_now:
        data = os.read(p2.stdout.fileno(), 1024)
        if not data:
            p2.stdout.close()
            to_read = []
        else:
            b.append(data)

    if write_now:
        if numwrites > 0:
            numwrites -= 1
            p1.stdin.write('hello world!\n'); p1.stdin.flush()
        else:
            p1.stdin.close()
            to_write = []

print b

也阻止select.select(). 通过传播prints,我发现了这一点:

  • 阅读是有效的。代码在执行过程中多次读取。
  • 写作也在工作。数据写入p1.stdin.
  • 结束时numwritesp1.stdin.close()被调用。
  • select()开始阻塞时,只有to_read一些东西,p2.stdoutto_write已经是空的了。
  • os.read()call 总是返回一些东西,所以p2.stdout.close()永远不会被调用。

两个测试的结论:关闭stdin管道上的第一个进程(grep在示例中)不会使其将其缓冲输出转储到下一个进程并死掉。

没有办法让它工作?

PS:我不想使用临时文件,我已经用文件进行了测试,我知道它可以工作。而且我不想使用窗户。

4

11 回答 11

22

我发现了如何做到这一点。

它与线程无关,与 select() 无关。

当我运行第一个进程 ( grep) 时,它会创建两个低级文件描述符,每个管道一个。让我们称之为ab

当我运行第二个进程时,b被传递给cut sdtin. Popen但是-有一个脑死亡的默认值close_fds=False

那样的效果,cut也是继承a。所以grep即使我关闭也不能死a,因为 stdin 仍然在cut's process 上打开(cut忽略它)。

以下代码现在可以完美运行。

from subprocess import Popen, PIPE

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
p1.stdin.write('Hello World\n')
p1.stdin.close()
result = p2.stdout.read() 
assert result == "Hello Worl\n"

close_fds=True应该是unix 系统上的默认值。在 Windows 上,它会关闭所有fd,因此它可以防止管道。

编辑:

PS:对于阅读此答案有类似问题的人:正如pooryorick在评论中所说,如果写入的数据p1.stdin大于缓冲区,这也可能会阻塞。在这种情况下,您应该将数据分成更小的部分,并用于select.select()知道何时读取/写入。问题中的代码应该提示如何实现它。

EDIT2:在pooryorick的更多帮助下找到了另一个解决方案-在执行第二个进程时,可以关闭属于第一个进程的s,而不是使用close_fds=True和关闭所有fds,它会起作用。fd关闭必须在孩子中完成,所以preexec_fnPopen 的函数非常方便地做到这一点。在执行 p2 时,您可以执行以下操作:

p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=devnull, preexec_fn=p1.stdin.close)
于 2009-10-23T23:33:38.893 回答
7

处理大文件

在 Python 中处理大文件时,需要统一应用两个原则。

  1. 由于任何 IO 例程都可能阻塞,因此我们必须将管道的每个阶段保持在不同的线程或进程中。我们在这个例子中使用了线程,但是子进程可以让你避开 GIL。
  2. 我们必须使用增量读取EOF和写入,以便我们在开始取得进展之前不要等待。

另一种方法是使用非阻塞 IO,尽管这在标准 Python 中很麻烦。有关使用非阻塞原语实现同步 IO API 的轻量级线程库,请参阅gevent

示例代码

我们将构建一个愚蠢的管道,大致是

{cat /usr/share/dict/words} | grep -v not              \
    | {upcase, filtered tee to stderr} | cut -c 1-10   \
    | {translate 'E' to '3'} | grep K | grep Z | {downcase}

其中大括号中的每个阶段{}都是用 Python 实现的,而其他阶段则使用标准的外部程序。TL;DR: 见这个要点

我们从预期的进口开始。

#!/usr/bin/env python
from subprocess import Popen, PIPE
import sys, threading

管道的 Python 阶段

除了最后一个 Python 实现的管道阶段之外的所有阶段都需要进入一个线程,这样它的 IO 就不会阻塞其他阶段。如果您希望它们实际并行运行(避免使用 GIL),它们可以改为在 Python 子进程中运行。

def writer(output):
    for line in open('/usr/share/dict/words'):
        output.write(line)
    output.close()
def filter(input, output):
    for line in input:
        if 'k' in line and 'z' in line: # Selective 'tee'
            sys.stderr.write('### ' + line)
        output.write(line.upper())
    output.close()
def leeter(input, output):
    for line in input:
        output.write(line.replace('E', '3'))
    output.close()

这些中的每一个都需要放在自己的线程中,我们将使用这个便利函数来完成。

def spawn(func, **kwargs):
    t = threading.Thread(target=func, kwargs=kwargs)
    t.start()
    return t

创建管道

使用 .创建外部阶段Popen和使用spawn. 该参数bufsize=-1说使用系统默认缓冲(通常为 4 kiB)。这通常比默认(无缓冲)或行缓冲更快,但如果您想直观地监控输出而没有滞后,则需要行缓冲。

grepv   = Popen(['grep','-v','not'], stdin=PIPE, stdout=PIPE, bufsize=-1)
cut     = Popen(['cut','-c','1-10'], stdin=PIPE, stdout=PIPE, bufsize=-1)
grepk = Popen(['grep', 'K'], stdin=PIPE, stdout=PIPE, bufsize=-1)
grepz = Popen(['grep', 'Z'], stdin=grepk.stdout, stdout=PIPE, bufsize=-1)

twriter = spawn(writer, output=grepv.stdin)
tfilter = spawn(filter, input=grepv.stdout, output=cut.stdin)
tleeter = spawn(leeter, input=cut.stdout, output=grepk.stdin)

推动管道

如上组装,管道中的所有缓冲区都将填满,但由于没有人从末尾(grepz.stdout)读取,它们将全部阻塞。我们可以在一次调用中读取整个内容grepz.stdout.read(),但这会占用大量内存来存储大文件。相反,我们逐渐阅读。

for line in grepz.stdout:
    sys.stdout.write(line.lower())

线程和进程到达EOF. 我们可以明确地使用

for t in [twriter, tfilter, tleeter]: t.join()
for p in [grepv, cut, grepk, grepz]: p.wait()

Python-2.6 及更早版本

在内部,subprocess.Popen调用fork、配置管道文件描述符和调用exec. 子进程 fromfork拥有父进程中所有文件描述符的副本,并且需要关闭两个EOF副本,然后相应的读取器才能获得. 这可以通过手动关闭管道(通过close_fds=True或适当的preexec_fn参数subprocess.Popen)或通过将FD_CLOEXEC标志设置为exec自动关闭文件描述符来解决。此标志在 Python-2.7 及更高版本中自动设置,请参阅issue12786。我们可以通过调用来获得 Python 早期版本中的 Python-2.7 行为

p._set_cloexec_flags(p.stdin)

p.stdin作为参数传递给后续subprocess.Popen.

于 2012-12-27T20:49:46.720 回答
3

使管道按预期工作的三个主要技巧

  1. 确保管道的每一端都用于不同的线程/进程(顶部附近的一些示例存在此问题)。

  2. 在每个进程中显式关闭管道的未使用端

  3. 通过禁用它(Python -u 选项)、使用 pty 或简单地用不会影响数据的东西填充缓冲区来处理缓冲(可能是 '\n',但任何合适的东西)。

Python“管道”模块(我是作者)中的示例完全适合您的场景,并使低级步骤相当清晰。

http://pypi.python.org/pypi/pipeline/

最近,我将 subprocess 模块用作生产者-处理器-消费者-控制器模式的一部分:

http://www.darkarchive.org/w/Pub/PythonInteract

此示例处理缓冲标准输入,而不使用 pty,并且还说明了哪些管道末端应该在哪里关闭。我更喜欢进程而不是线程,但原理是一样的。此外,它还说明了向生产者提供数据并从消费者收集输出的同步队列,以及如何干净地关闭它们(注意插入队列中的哨兵)。这种模式允许根据最近的输出生成新的输入,从而允许递归发现和处理。

于 2010-01-31T20:12:58.360 回答
3

如果太多数据被写入管道的接收端,Nosklo 提供的解决方案将很快中断:


from subprocess import Popen, PIPE

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
p1.stdin.write('Hello World\n' * 20000)
p1.stdin.close()
result = p2.stdout.read() 
assert result == "Hello Worl\n"

如果此脚本没有挂在您的机器上,只需将“20000”增加到超过操作系统管道缓冲区大小的值。

这是因为操作系统正在缓冲“grep”的输入,但是一旦该缓冲区已满,p1.stdin.write调用将阻塞,直到从p2.stdout. 在玩具场景中,您可以在同一进程中写入/读取管道,但在正常使用中,需要从一个线程/进程写入并从单独的线程/进程读取。这适用于 subprocess.popen、os.pipe、os.popen* 等。

另一个转折点是,有时您希望继续向管道提供从同一管道的早期输出生成的项目。解决方案是让pipe feeder和pipe reader都与man程序异步,并实现两个队列:一个在主程序和pipe feeder之间,一个在主程序和pipe reader之间。 PythonInteract就是一个例子。

Subprocess 是一个很好的便利模型,但是因为它隐藏了它在后台执行的 os.popen 和 os.fork 调用的细节,所以有时它比它使用的低级别调用更难处理。出于这个原因,子进程不是了解进程间管道如何真正工作的好方法。

于 2010-02-08T16:30:25.503 回答
2

回应 nosklo 的断言(请参阅此问题的其他评论),它不能没有close_fds=True

close_fds=True仅当您打开其他文件描述符时才需要。打开多个子进程时,最好跟踪可能被继承的打开文件,并明确关闭任何不需要的文件:

from subprocess import Popen, PIPE

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p1.stdin.write('Hello World\n')
p1.stdin.close()
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
result = p2.stdout.read() 
assert result == "Hello Worl\n"

close_fds默认为,False因为 subprocess 更愿意相信调用程序知道它对打开的文件描述符做了什么,并且只需为调用者提供一个简单的选项来关闭它们,如果这是它想要做的。

但真正的问题是管道缓冲区会咬你除了玩具示例之外的所有内容。正如我在这个问题的其他答案中所说,经验法则是不要让你的读者和你的作者在同一个进程/线程中打开。任何想要使用 subprocess 模块进行双向通信的人都可以先学习 os.pipe 和 os.fork。如果你有一个很好的例子可以看,它们实际上并不难使用。

于 2010-02-10T20:49:51.323 回答
2

您必须在多个线程中执行此操作。否则,您最终会遇到无法发送数据的情况:子 p1 不会读取您的输入,因为 p2 不会读取 p1 的输出,因为您没有读取 p2 的输出。

所以你需要一个后台线程来读取 p2 写出的内容。这将允许 p2 在将一些数据写入管道后继续,因此它可以从 p1 读取下一行输入,这再次允许 p1 处理您发送给它的数据。

或者,您可以使用后台线程将数据发送到 p1,并在主线程中从 p2 读取输出。但任何一方都必须是一个线程。

于 2009-10-20T15:41:29.723 回答
1

I think you may be examining the wrong problem. Certainly as Aaron says if you try to be both a producer to the beginning of a pipeline, and a consumer of the end of the pipeline, it is easy to get into a deadlock situation. This is the problem that communicate() solves.

communicate() isn't exactly correct for you since stdin and stdout are on different subprocess objects; but if you take a look at the implementation in subprocess.py you'll see that it does exactly what Aaron suggested.

Once you see that communicate both reads and writes, you'll see that in your second try communicate() competes with p2 for the output of p1:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
# ...
p1.communicate('data\n')       # reads from p1.stdout, as does p2

I am running on win32, which definitely has different i/o and buffering characteristics, but this works for me:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=get_output, args=(p2,)) 
t.start()
p1.stdin.write('hello world\n' * 100000)
p1.stdin.close()
t.join()

I tuned the input size to produce a deadlock when using a naive unthreaded p2.read()

You might also try buffering into a file, eg

fd, _ = tempfile.mkstemp()
os.write(fd, 'hello world\r\n' * 100000)
os.lseek(fd, 0, os.SEEK_SET)
p1 = Popen(["grep", "-v", "not"], stdin=fd, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
print p2.stdout.read()

That also works for me without deadlocks.

于 2009-10-23T00:49:36.160 回答
1

在上面的其中一条评论中,我向 nosklo 提出挑战,要么发布一些代码来支持他的断言,select.select要么对我之前投反对票的回复进行投票。他回复了以下代码:

from subprocess import Popen, PIPE
import select

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)

data_to_write = 100000 * 'hello world\n'
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer
written = 0


while to_read or to_write:
    read_now, write_now, xlist = select.select(to_read, to_write, [])
    if read_now:
        data = p2.stdout.read(1024)
        if not data:
            p2.stdout.close()
            to_read = []
        else:
            b.append(data)

    if write_now:
        if written < len(data_to_write):
            part = data_to_write[written:written+1024]
            written += len(part)
            p1.stdin.write(part); p1.stdin.flush()
        else:
            p1.stdin.close()
            to_write = []

print b

这个脚本的一个问题是它会猜测系统管道缓冲区的大小/性质。如果脚本可以删除像 1024 这样的幻数,则该脚本会遇到更少的失败。

最大的问题是,这个脚本代码只能在数据输入和外部程序的正确组合下始终如一地工作。grep 和 cut 都使用行,因此它们的内部缓冲区的行为有点不同。如果我们使用更通用的命令,如“cat”,并将较小的数据位写入管道,致命的竞争条件将更频繁地弹出:

from subprocess import Popen, PIPE
import select
import time

p1 = Popen(["cat"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cat"], stdin=p1.stdout, stdout=PIPE, close_fds=True)

data_to_write = 'hello world\n'
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer
written = 0


while to_read or to_write:
    time.sleep(1)
    read_now, write_now, xlist = select.select(to_read, to_write, [])
    if read_now:
        print 'I am reading now!'
        data = p2.stdout.read(1024)
        if not data:
            p1.stdout.close()
            to_read = []
        else:
            b.append(data)

    if write_now:
        print 'I am writing now!'
        if written < len(data_to_write):
            part = data_to_write[written:written+1024]
            written += len(part)
            p1.stdin.write(part); p1.stdin.flush()
        else:
            print 'closing file'
            p1.stdin.close()
            to_write = []

print b

在这种情况下,会出现两种不同的结果:

write, write, close file, read -> success
write, read -> hang

因此,我再次向 nosklo 提出挑战,要么发布代码来显示 select.select用于处理来自单个线程的任意输入和管道缓冲,要么支持我的响应。

底线:不要试图从单个线程操作管道的两端。这不值得。有关如何正确执行此操作的低级示例,请参阅 管道。

于 2010-02-10T19:14:16.060 回答
0

使用 SpooledTemporaryFile 怎么样?这绕过(但可能无法解决)问题:

http://docs.python.org/library/tempfile.html#tempfile.SpooledTemporaryFile

您可以像写入文件一样写入它,但它实际上是一个内存块。

还是我完全误会了...

于 2009-10-23T02:20:03.187 回答
-1

这是一个使用 Popen 和 os.fork 来完成同样事情的例子。而不是使用close_fds它只是在正确的位置关闭管道。比尝试使用简单得多select.select,并且充分利用了系统管道缓冲区。

from subprocess import Popen, PIPE
import os
import sys

p1 = Popen(["cat"], stdin=PIPE, stdout=PIPE)

pid = os.fork()

if pid: #parent
    p1.stdin.close()
    p2 = Popen(["cat"], stdin=p1.stdout, stdout=PIPE)
    data = p2.stdout.read()
    sys.stdout.write(data)
    p2.stdout.close()

else: #child
    data_to_write = 'hello world\n' * 100000
    p1.stdin.write(data_to_write)
    p1.stdin.close()
于 2010-02-11T15:44:39.133 回答
-1

它比你想象的要简单得多!

import sys
from subprocess import Popen, PIPE

# Pipe the command here. It will read from stdin.
#   So cat a file, to stdin, like (cat myfile | ./this.py),
#     or type on terminal and hit control+d when done, etc
#   No need to handle this yourself, that's why we have shell's!
p = Popen("grep -v not | cut -c 1-10", shell=True, stdout=PIPE)

nextData = None
while True:
    nextData = p.stdout.read()
    if nextData in (b'', ''):
        break
    sys.stdout.write ( nextData.decode('utf-8') )


p.wait()

此代码是为 python 3.6 编写的,适用于 python 2.7。

像这样使用它:

cat README.md  | python ./example.py

或者

python example.py < README.md

将“README.md”的内容通过管道传输到该程序。

但是..在这一点上,为什么不直接使用“cat”,然后像你想要的那样管道输出呢?喜欢:

cat filename | grep -v not | cut -c 1-10

输入控制台也可以完成这项工作。如果我进一步处理输出,我个人只会使用 code 选项,否则 shell 脚本会更容易维护和保留。

你只是,使用外壳为你做管道。一进一出。这就是她擅长做的事情、管理流程和管理输入和输出的单宽度链。有些人会称其为 shell 最好的非交互功能。

于 2017-07-19T03:02:28.500 回答