65

我目前正在使用subprocess.Popen(cmd, shell=TRUE)

我对 Python 还很陌生,但“感觉”应该有一些 api 可以让我做类似的事情:

subprocess.Popen(cmd, shell=TRUE,  postexec_fn=function_to_call_on_exit)

我这样做是为了function_to_call_on_exit可以在知道 cmd 已退出的基础上做一些事情(例如,计算当前正在运行的外部进程的数量)

我假设我可以相当简单地将子进程包装在一个将线程与Popen.wait()方法相结合的类中,但是由于我还没有在 Python 中完成线程,而且看起来这对于 API 的存在来说可能已经足够普遍了,我想我会试着先找到一个。

提前致谢 :)

4

8 回答 8

74

你是对的 - 没有很好的 API。您的第二点也是正确的-设计一个使用线程为您执行此操作的函数非常容易。

import threading
import subprocess

def popen_and_call(on_exit, popen_args):
    """
    Runs the given args in a subprocess.Popen, and then calls the function
    on_exit when the subprocess completes.
    on_exit is a callable object, and popen_args is a list/tuple of args that 
    would give to subprocess.Popen.
    """
    def run_in_thread(on_exit, popen_args):
        proc = subprocess.Popen(*popen_args)
        proc.wait()
        on_exit()
        return
    thread = threading.Thread(target=run_in_thread, args=(on_exit, popen_args))
    thread.start()
    # returns immediately after the thread starts
    return thread

甚至线程在 Python 中也很容易,但请注意,如果 on_exit() 计算量很大,您需要将其放在一个单独的进程中,而不是使用多处理(这样 GIL 不会减慢您的程序速度)。它实际上非常简单 - 您基本上可以将所有调用替换为threading.Threadmultiprocessing.Process因为它们(几乎)遵循相同的 API。

于 2010-04-06T00:27:04.740 回答
21

Python 3.2 中有一个concurrent.futures模块(可通过pip install futures旧版 Python < 3.2 获得):

pool = Pool(max_workers=1)
f = pool.submit(subprocess.call, "sleep 2; echo done", shell=True)
f.add_done_callback(callback)

回调将在调用的同一进程中调用f.add_done_callback()

完整程序

import logging
import subprocess
# to install run `pip install futures` on Python <3.2
from concurrent.futures import ThreadPoolExecutor as Pool

info = logging.getLogger(__name__).info

def callback(future):
    if future.exception() is not None:
        info("got exception: %s" % future.exception())
    else:
        info("process returned %d" % future.result())

def main():
    logging.basicConfig(
        level=logging.INFO,
        format=("%(relativeCreated)04d %(process)05d %(threadName)-10s "
                "%(levelname)-5s %(msg)s"))

    # wait for the process completion asynchronously
    info("begin waiting")
    pool = Pool(max_workers=1)
    f = pool.submit(subprocess.call, "sleep 2; echo done", shell=True)
    f.add_done_callback(callback)
    pool.shutdown(wait=False) # no .submit() calls after that point
    info("continue waiting asynchronously")

if __name__=="__main__":
    main()

输出

$ python . && python3 .
0013 05382 MainThread INFO  begin waiting
0021 05382 MainThread INFO  continue waiting asynchronously
done
2025 05382 Thread-1   INFO  process returned 0
0007 05402 MainThread INFO  begin waiting
0014 05402 MainThread INFO  continue waiting asynchronously
done
2018 05402 Thread-1   INFO  process returned 0
于 2011-03-06T09:43:22.350 回答
16

我修改了 Daniel G 的答案,只是将subprocess.Popen argsandkwargs作为自己传递,而不是作为单独的元组/列表传递,因为我想将关键字参数与subprocess.Popen.

就我而言postExec(),我有一个想要运行的方法subprocess.Popen('exe', cwd=WORKING_DIR)

使用下面的代码,它就变成了popenAndCall(postExec, 'exe', cwd=WORKING_DIR)

import threading
import subprocess

def popenAndCall(onExit, *popenArgs, **popenKWArgs):
    """
    Runs a subprocess.Popen, and then calls the function onExit when the
    subprocess completes.

    Use it exactly the way you'd normally use subprocess.Popen, except include a
    callable to execute as the first argument. onExit is a callable object, and
    *popenArgs and **popenKWArgs are simply passed up to subprocess.Popen.
    """
    def runInThread(onExit, popenArgs, popenKWArgs):
        proc = subprocess.Popen(*popenArgs, **popenKWArgs)
        proc.wait()
        onExit()
        return

    thread = threading.Thread(target=runInThread,
                              args=(onExit, popenArgs, popenKWArgs))
    thread.start()

    return thread # returns immediately after the thread starts
于 2012-05-30T20:39:33.257 回答
7

我有同样的问题,并使用multiprocessing.Pool. 涉及两个hacky技巧:

  1. 设置池 1 的大小
  2. 在长度为 1 的可迭代对象内传递可迭代参数

结果是一个在完成时使用回调执行的函数

def sub(arg):
    print arg             #prints [1,2,3,4,5]
    return "hello"

def cb(arg):
    print arg             # prints "hello"

pool = multiprocessing.Pool(1)
rval = pool.map_async(sub,([[1,2,3,4,5]]),callback =cb)
(do stuff) 
pool.close()

就我而言,我也希望调用是非阻塞的。工作精美

于 2011-01-08T23:03:53.657 回答
2

我受到 Daniel G. answer 的启发并实现了一个非常简单的用例——在我的工作中,我经常需要使用不同的参数重复调用相同的(外部)进程。我已经破解了一种方法来确定每个特定调用何时完成,但现在我有一种更简洁的方法来发出回调。

我喜欢这个实现,因为它非常简单,但它允许我向多个处理器发出异步调用(注意我使用multiprocessing而不是threading)并在完成时接收通知。

我测试了示例程序并且效果很好。请随意编辑并提供反馈。

import multiprocessing
import subprocess

class Process(object):
    """This class spawns a subprocess asynchronously and calls a
    `callback` upon completion; it is not meant to be instantiated
    directly (derived classes are called instead)"""
    def __call__(self, *args):
    # store the arguments for later retrieval
    self.args = args
    # define the target function to be called by
    # `multiprocessing.Process`
    def target():
        cmd = [self.command] + [str(arg) for arg in self.args]
        process = subprocess.Popen(cmd)
        # the `multiprocessing.Process` process will wait until
        # the call to the `subprocess.Popen` object is completed
        process.wait()
        # upon completion, call `callback`
        return self.callback()
    mp_process = multiprocessing.Process(target=target)
    # this call issues the call to `target`, but returns immediately
    mp_process.start()
    return mp_process

if __name__ == "__main__":

    def squeal(who):
    """this serves as the callback function; its argument is the
    instance of a subclass of Process making the call"""
    print "finished %s calling %s with arguments %s" % (
        who.__class__.__name__, who.command, who.args)

    class Sleeper(Process):
    """Sample implementation of an asynchronous process - define
    the command name (available in the system path) and a callback
    function (previously defined)"""
    command = "./sleeper"
    callback = squeal

    # create an instance to Sleeper - this is the Process object that
    # can be called repeatedly in an asynchronous manner
    sleeper_run = Sleeper()

    # spawn three sleeper runs with different arguments
    sleeper_run(5)
    sleeper_run(2)
    sleeper_run(1)

    # the user should see the following message immediately (even
    # though the Sleeper calls are not done yet)
    print "program continued"

样本输出:

program continued
finished Sleeper calling ./sleeper with arguments (1,)
finished Sleeper calling ./sleeper with arguments (2,)
finished Sleeper calling ./sleeper with arguments (5,)

以下是sleeper.c- 我的示例“耗时”外部进程的源代码

#include<stdlib.h>
#include<unistd.h>

int main(int argc, char *argv[]){
  unsigned int t = atoi(argv[1]);
  sleep(t);
  return EXIT_SUCCESS;
}

编译为:

gcc -o sleeper sleeper.c
于 2011-03-06T07:14:22.537 回答
1

在 concurrent.futures ( https://docs.python.org/3/library/concurrent.futures.html )中还有自 3.2 以来的 ProcesPoolExecutor 。用法与上面提到的 ThreadPoolExecutor 一样。通过 executor.add_done_callback() 附加退出回调。

于 2020-05-05T11:49:53.480 回答
0

在 POSIX 系统上,当子进程退出时,父进程会收到一个 SIGCHLD 信号。要在子进程命令退出时运行回调,请处理父进程中的 SIGCHLD 信号。像这样的东西:

import signal
import subprocess

process = subprocess.Popen('mycmd', shell=TRUE)

def sigchld_handler(signum, frame):
    # This is run when the child exits.
    # Do something here ...
    pass

signal.signal(signal.SIGCHLD, sigchld_handler)

请注意,这不适用于 Windows。

于 2021-11-13T05:21:28.663 回答
-1

AFAIK 没有这样的 API,至少在subprocess模块中没有。你需要自己滚动一些东西,可能使用线程。

于 2010-04-05T23:52:26.013 回答