3

In a simple script which uses subprocess to gzip output (using subprocess.PIPE to stdin of an external command), if a multiprocessing.Pool object is created between the time the subprocess is created and the stdin of the process is closed, subprocess.wait() will hang forever.

import multiprocessing
import subprocess

proc = subprocess.Popen(["gzip", "-c", "-"], 
                        stdout=open('filename', 'w'), stdin=subprocess.PIPE)
multiprocessing.Pool()
proc.stdin.close()
proc.wait()

Moving the multiprocessing.Pool call one line up or one line down prevents the problem.

I am experiencing this on Python 2.7.3 (Linux) and Python 2.7.1 (OS X).

Obviously, this is a trivial example -- the real usage is much more complex. I'm also already aware of GzipFile -- I would prefer not to use it; using subprocess lets me get more CPU usage by splitting the gzipping into a separate thread.

I can't see how simply instantiating a Pool should have this impact.

4

1 回答 1

8

When you invoke multiprocessing.Pool, the multiprocessing module creates several new processes (using os.fork or similar).

By default, during a fork, new processes inherit all open file descriptors.

When you invoke subprocess.Popen with a subprocess.PIPE argument, the subprocess module creates some new pipe file descriptors to send data to/from the new process. In this particular case, the pipe is used to send data from the parent process (python) to the child (gzip), and gzip will exit—and thus make the proc.wait() finish—when all write access to the pipe goes away. (This is what generates "EOF on a pipe": no more write-able file descriptors exist to that pipe.)

Thus, in this case, if you (all in the "original" python process) do this in this sequence:

  1. create a pipe
  2. create some multiprocessing.Pool processes
  3. send data to gzip
  4. close the pipe to gzip

then, due to the behavior of fork, each of the Pool processes has an os.dup of the write-to-gzip pipe, so gzip continues waiting for more data, which those Pool processes can (but never do) send. The gzip process will exit as soon as the Pool processes close their pipe descriptors.

Fixing this in real (more complicated) code can be nontrivial. Ideally, what you would like is for multiprocessing.Pool to know (magically, somehow) which file descriptors should be retained, and which should not, but this is not as simple as "just close a bunch of descriptors in the created child processes":

output = open('somefile', 'a')
def somefunc(arg):
    ... do some computation, etc ...
    output.write(result)
pool = multiprocessing.Pool()
pool.map(somefunc, iterable)

Clearly output.fileno() must be shared by the worker processes here.

You could try to use the Pool's initializer to invoke proc.stdin.close (or os.close on a list of fd's), but then you need to arrange to keep track of descriptors-to-close. It's probably simplest to restructure your code to avoid creating a pool "at the wrong time".

于 2013-07-21T03:57:55.667 回答