357

这是运行返回其数据的任意命令stdout或在非零退出代码上引发异常的 Python 代码:

proc = subprocess.Popen(
    cmd,
    stderr=subprocess.STDOUT,  # Merge stdout and stderr
    stdout=subprocess.PIPE,
    shell=True)

communicate用于等待进程退出:

stdoutdata, stderrdata = proc.communicate()

subprocess模块不支持超时——能够终止运行超过 X 秒的进程——因此,communicate可能需要永远运行。

在 Windows 和 Linux 上运行的 Python 程序中实现超时的最简单方法是什么?

4

30 回答 30

214

我不太了解底层细节;但是,鉴于在 python 2.6 中,API 提供了等待线程和终止进程的能力,那么在单独的线程中运行进程呢?

import subprocess, threading

class Command(object):
    def __init__(self, cmd):
        self.cmd = cmd
        self.process = None

    def run(self, timeout):
        def target():
            print 'Thread started'
            self.process = subprocess.Popen(self.cmd, shell=True)
            self.process.communicate()
            print 'Thread finished'

        thread = threading.Thread(target=target)
        thread.start()

        thread.join(timeout)
        if thread.is_alive():
            print 'Terminating process'
            self.process.terminate()
            thread.join()
        print self.process.returncode

command = Command("echo 'Process started'; sleep 2; echo 'Process finished'")
command.run(timeout=3)
command.run(timeout=1)

在我的机器中这个片段的输出是:

Thread started
Process started
Process finished
Thread finished
0
Thread started
Process started
Terminating process
Thread finished
-15

可以看出,在第一次执行中,进程正确完成(返回代码 0),而在第二次执行中,进程被终止(返回代码 -15)。

我没有在 Windows 中测试过;但是,除了更新示例命令之外,我认为它应该可以工作,因为我在文档中没有找到任何说不支持 thread.join 或 process.terminate 的内容。

于 2011-01-28T07:39:08.417 回答
204

In Python 3.3+:

from subprocess import STDOUT, check_output

output = check_output(cmd, stderr=STDOUT, timeout=seconds)

output is a byte string that contains command's merged stdout, stderr data.

check_output raises CalledProcessError on non-zero exit status as specified in the question's text unlike proc.communicate() method.

I've removed shell=True because it is often used unnecessarily. You can always add it back if cmd indeed requires it. If you add shell=True i.e., if the child process spawns its own descendants; check_output() can return much later than the timeout indicates, see Subprocess timeout failure.

The timeout feature is available on Python 2.x via the subprocess32 backport of the 3.2+ subprocess module.

于 2012-10-02T21:06:25.870 回答
150

jcollado 的答案可以使用threading.Timer类来简化:

import shlex
from subprocess import Popen, PIPE
from threading import Timer

def run(cmd, timeout_sec):
    proc = Popen(shlex.split(cmd), stdout=PIPE, stderr=PIPE)
    timer = Timer(timeout_sec, proc.kill)
    try:
        timer.start()
        stdout, stderr = proc.communicate()
    finally:
        timer.cancel()

# Examples: both take 1 second
run("sleep 1", 5)  # process ends normally at 1 second
run("sleep 5", 1)  # timeout happens at 1 second
于 2012-04-04T13:36:49.343 回答
84

如果你在 Unix 上,

import signal
  ...
class Alarm(Exception):
    pass

def alarm_handler(signum, frame):
    raise Alarm

signal.signal(signal.SIGALRM, alarm_handler)
signal.alarm(5*60)  # 5 minutes
try:
    stdoutdata, stderrdata = proc.communicate()
    signal.alarm(0)  # reset the alarm
except Alarm:
    print "Oops, taking too long!"
    # whatever else
于 2009-07-28T01:43:16.680 回答
44

这是 Alex Martelli 的解决方案,作为具有适当进程终止的模块。其他方法不起作用,因为它们不使用 proc.communicate()。因此,如果您有一个产生大量输出的进程,它将填充其输出缓冲区,然后阻塞,直到您从中读取一些内容。

from os import kill
from signal import alarm, signal, SIGALRM, SIGKILL
from subprocess import PIPE, Popen

def run(args, cwd = None, shell = False, kill_tree = True, timeout = -1, env = None):
    '''
    Run a command with a timeout after which it will be forcibly
    killed.
    '''
    class Alarm(Exception):
        pass
    def alarm_handler(signum, frame):
        raise Alarm
    p = Popen(args, shell = shell, cwd = cwd, stdout = PIPE, stderr = PIPE, env = env)
    if timeout != -1:
        signal(SIGALRM, alarm_handler)
        alarm(timeout)
    try:
        stdout, stderr = p.communicate()
        if timeout != -1:
            alarm(0)
    except Alarm:
        pids = [p.pid]
        if kill_tree:
            pids.extend(get_process_children(p.pid))
        for pid in pids:
            # process might have died before getting to this line
            # so wrap to avoid OSError: no such process
            try: 
                kill(pid, SIGKILL)
            except OSError:
                pass
        return -9, '', ''
    return p.returncode, stdout, stderr

def get_process_children(pid):
    p = Popen('ps --no-headers -o pid --ppid %d' % pid, shell = True,
              stdout = PIPE, stderr = PIPE)
    stdout, stderr = p.communicate()
    return [int(p) for p in stdout.split()]

if __name__ == '__main__':
    print run('find /', shell = True, timeout = 3)
    print run('find', shell = True)
于 2010-07-24T19:29:54.940 回答
22

timeout现在在 subprocess 模块中得到支持(从 Python3.3 开始call()):communicate()

import subprocess

subprocess.call("command", timeout=20, shell=True)

这将调用命令并引发异常

subprocess.TimeoutExpired

如果命令在 20 秒后没有完成。

然后,您可以处理异常以继续您的代码,例如:

try:
    subprocess.call("command", timeout=20, shell=True)
except subprocess.TimeoutExpired:
    # insert code here

希望这可以帮助。

于 2015-02-22T16:51:55.150 回答
19

惊讶没有人提到使用timeout

timeout 5 ping -c 3 somehost

这显然不适用于每个用例,但如果您处理一个简单的脚本,这很难被击败。

homebrew也可通过mac 用户在 coreutils 中作为 gtimeout使用。

于 2015-04-21T04:43:41.940 回答
19

Python 3.5开始,有一个新的subprocess.run通用命令(用于替换check_call, check_output...)并且它也具有timeout=参数。

子进程。运行(args, *, stdin=None, input=None, stdout=None, stderr=None, shell=False, cwd=None, timeout=None , check=False, encoding=None, errors=None)

运行args描述的命令。等待命令完成,然后返回一个CompletedProcess实例。

subprocess.TimeoutExpired超时到期时会引发异常。

于 2018-03-30T09:19:58.817 回答
17

我已经修改了 sussudio答案。现在函数返回: ( returncode, stdout, stderr, timeout) -stdout并被stderr解码为 utf-8 字符串

def kill_proc(proc, timeout):
  timeout["value"] = True
  proc.kill()

def run(cmd, timeout_sec):
  proc = subprocess.Popen(shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  timeout = {"value": False}
  timer = Timer(timeout_sec, kill_proc, [proc, timeout])
  timer.start()
  stdout, stderr = proc.communicate()
  timer.cancel()
  return proc.returncode, stdout.decode("utf-8"), stderr.decode("utf-8"), timeout["value"]
于 2012-05-26T18:30:19.987 回答
10

另一种选择是写入临时文件以防止标准输出阻塞,而不是需要使用通信()进行轮询。这对我有用,而其他答案却没有;例如在窗户上。

    outFile =  tempfile.SpooledTemporaryFile() 
    errFile =   tempfile.SpooledTemporaryFile() 
    proc = subprocess.Popen(args, stderr=errFile, stdout=outFile, universal_newlines=False)
    wait_remaining_sec = timeout

    while proc.poll() is None and wait_remaining_sec > 0:
        time.sleep(1)
        wait_remaining_sec -= 1

    if wait_remaining_sec <= 0:
        killProc(proc.pid)
        raise ProcessIncompleteError(proc, timeout)

    # read temp streams from start
    outFile.seek(0);
    errFile.seek(0);
    out = outFile.read()
    err = errFile.read()
    outFile.close()
    errFile.close()
于 2010-11-10T12:51:15.653 回答
6

这是我的解决方案,我使用的是线程和事件:

import subprocess
from threading import Thread, Event

def kill_on_timeout(done, timeout, proc):
    if not done.wait(timeout):
        proc.kill()

def exec_command(command, timeout):

    done = Event()
    proc = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

    watcher = Thread(target=kill_on_timeout, args=(done, timeout, proc))
    watcher.daemon = True
    watcher.start()

    data, stderr = proc.communicate()
    done.set()

    return data, stderr, proc.returncode

在行动:

In [2]: exec_command(['sleep', '10'], 5)
Out[2]: ('', '', -9)

In [3]: exec_command(['sleep', '10'], 11)
Out[3]: ('', '', 0)
于 2014-07-02T19:59:19.837 回答
6

我将带有 threading from 的解决方案添加jcollado到我的 Python 模块easyprocess中。

安装:

pip install easyprocess

例子:

from easyprocess import Proc

# shell is not supported!
stdout=Proc('ping localhost').call(timeout=1.5).stdout
print stdout
于 2011-04-10T12:28:02.577 回答
6

预先添加 Linux 命令timeout并不是一个糟糕的解决方法,它对我有用。

cmd = "timeout 20 "+ cmd
subprocess.Popen(cmd.split(), stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(output, err) = p.communicate()
于 2018-02-09T05:22:39.683 回答
5

我使用的解决方案是在 shell 命令前加上timelimit。如果命令花费的时间太长,timelimit 将停止它,并且 Popen 将有一个由 timelimit 设置的返回码。如果 > 128,则意味着 timelimit 杀死了该进程。

另请参阅具有超时和大输出 (>64K) 的 python 子进程

于 2011-12-14T16:14:40.717 回答
5

如果您使用的是 python 2,请尝试一下

import subprocess32

try:
    output = subprocess32.check_output(command, shell=True, timeout=3)
except subprocess32.TimeoutExpired as e:
    print e
于 2016-07-15T08:10:19.310 回答
3

我已经实现了我可以从其中一些中收集到的东西。这适用于 Windows,由于这是一个社区 wiki,我想我也会分享我的代码:

class Command(threading.Thread):
    def __init__(self, cmd, outFile, errFile, timeout):
        threading.Thread.__init__(self)
        self.cmd = cmd
        self.process = None
        self.outFile = outFile
        self.errFile = errFile
        self.timed_out = False
        self.timeout = timeout

    def run(self):
        self.process = subprocess.Popen(self.cmd, stdout = self.outFile, \
            stderr = self.errFile)

        while (self.process.poll() is None and self.timeout > 0):
            time.sleep(1)
            self.timeout -= 1

        if not self.timeout > 0:
            self.process.terminate()
            self.timed_out = True
        else:
            self.timed_out = False

然后从另一个类或文件:

        outFile =  tempfile.SpooledTemporaryFile()
        errFile =   tempfile.SpooledTemporaryFile()

        executor = command.Command(c, outFile, errFile, timeout)
        executor.daemon = True
        executor.start()

        executor.join()
        if executor.timed_out:
            out = 'timed out'
        else:
            outFile.seek(0)
            errFile.seek(0)
            out = outFile.read()
            err = errFile.read()

        outFile.close()
        errFile.close()
于 2011-02-02T18:47:14.870 回答
2

一旦您了解了 *unix 中的全流程运行机制,您将很容易找到更简单的解决方案:

考虑这个简单的例子,如何使用 select.select() (现在几乎在 *nix 上几乎所有地方都可用)来制作可超时的communicate() 方法。这也可以用 epoll/poll/kqueue 编写,但是 select.select() 变体可能是一个很好的例子。select.select() 的主要限制(速度和 1024 max fds)不适用于您的任务。

这在 *nix 下工作,不创建线程,不使用信号,可以从任何线程(不仅是主线程)启动,并且足够快以从我的机器(i5 2.3ghz)上的标准输出读取 250mb/s 的数据。

在通信结束时加入 stdout/stderr 时出现问题。如果您有大量程序输出,这可能会导致大量内存使用。但是你可以用较小的超时时间多次调用communicate()。

class Popen(subprocess.Popen):
    def communicate(self, input=None, timeout=None):
        if timeout is None:
            return subprocess.Popen.communicate(self, input)

        if self.stdin:
            # Flush stdio buffer, this might block if user
            # has been writing to .stdin in an uncontrolled
            # fashion.
            self.stdin.flush()
            if not input:
                self.stdin.close()

        read_set, write_set = [], []
        stdout = stderr = None

        if self.stdin and input:
            write_set.append(self.stdin)
        if self.stdout:
            read_set.append(self.stdout)
            stdout = []
        if self.stderr:
            read_set.append(self.stderr)
            stderr = []

        input_offset = 0
        deadline = time.time() + timeout

        while read_set or write_set:
            try:
                rlist, wlist, xlist = select.select(read_set, write_set, [], max(0, deadline - time.time()))
            except select.error as ex:
                if ex.args[0] == errno.EINTR:
                    continue
                raise

            if not (rlist or wlist):
                # Just break if timeout
                # Since we do not close stdout/stderr/stdin, we can call
                # communicate() several times reading data by smaller pieces.
                break

            if self.stdin in wlist:
                chunk = input[input_offset:input_offset + subprocess._PIPE_BUF]
                try:
                    bytes_written = os.write(self.stdin.fileno(), chunk)
                except OSError as ex:
                    if ex.errno == errno.EPIPE:
                        self.stdin.close()
                        write_set.remove(self.stdin)
                    else:
                        raise
                else:
                    input_offset += bytes_written
                    if input_offset >= len(input):
                        self.stdin.close()
                        write_set.remove(self.stdin)

            # Read stdout / stderr by 1024 bytes
            for fn, tgt in (
                (self.stdout, stdout),
                (self.stderr, stderr),
            ):
                if fn in rlist:
                    data = os.read(fn.fileno(), 1024)
                    if data == '':
                        fn.close()
                        read_set.remove(fn)
                    tgt.append(data)

        if stdout is not None:
            stdout = ''.join(stdout)
        if stderr is not None:
            stderr = ''.join(stderr)

        return (stdout, stderr)
于 2012-05-11T13:46:13.000 回答
2

在 Python 3.7.8 中测试超时后捕获的输出示例:

try:
    return subprocess.run(command, shell=True, capture_output=True, timeout=20, cwd=cwd, universal_newlines=True)
except subprocess.TimeoutExpired as e:
    print(e.output.decode(encoding="utf-8", errors="ignore"))
    assert False;

异常 subprocess.TimeoutExpired 具有输出和其他成员:

cmd - 用于生成子进程的命令。

timeout - 以秒为单位的超时。

output - 子进程的输出,如果它被 run() 或 check_output() 捕获。否则,无。

stdout - 输出别名,用于与 stderr 对称。

stderr - 子进程的标准错误输出(如果它被 run() 捕获)。否则,无。

更多信息:https ://docs.python.org/3/library/subprocess.html#subprocess.TimeoutExpired

于 2020-11-26T18:04:54.807 回答
2

蟒蛇2.7

import time
import subprocess

def run_command(cmd, timeout=0):
    start_time = time.time()
    df = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    while timeout and df.poll() == None:
        if time.time()-start_time >= timeout:
            df.kill()
            return -1, ""
    output = '\n'.join(df.communicate()).strip()
    return df.returncode, output
于 2018-11-08T00:11:38.927 回答
2

您可以使用select

import subprocess
from datetime import datetime
from select import select

def call_with_timeout(cmd, timeout):
    started = datetime.now()
    sp = subprocess.Popen(cmd, stdout=subprocess.PIPE)
    while True:
        p = select([sp.stdout], [], [], timeout)
        if p[0]:
            p[0][0].read()
        ret = sp.poll()
        if ret is not None:
            return ret
        if (datetime.now()-started).total_seconds() > timeout:
            sp.kill()
            return None
于 2016-12-30T19:12:27.417 回答
1

我已经在 Windows、Linux 和 Mac 上成功使用了 killableprocess 。如果您使用的是 Cygwin Python,则需要OSAF 的 killableprocess 版本,否则本机 Windows 进程将不会被杀死。

于 2009-10-12T23:15:47.083 回答
1

虽然我没有深入研究它,但我在 ActiveState 找到的这个装饰器似乎对这类事情非常有用。除了subprocess.Popen(..., close_fds=True),至少我已经准备好在 Python 中编写 shell 脚本了。

于 2011-08-24T01:42:21.763 回答
1

有一个想法是继承 Popen 类并使用一些简单的方法装饰器对其进行扩展。我们称之为 ExpirablePopen。

from logging import error
from subprocess import Popen
from threading import Event
from threading import Thread


class ExpirablePopen(Popen):

    def __init__(self, *args, **kwargs):
        self.timeout = kwargs.pop('timeout', 0)
        self.timer = None
        self.done = Event()

        Popen.__init__(self, *args, **kwargs)

    def __tkill(self):
        timeout = self.timeout
        if not self.done.wait(timeout):
            error('Terminating process {} by timeout of {} secs.'.format(self.pid, timeout))
            self.kill()

    def expirable(func):
        def wrapper(self, *args, **kwargs):
            # zero timeout means call of parent method
            if self.timeout == 0:
                return func(self, *args, **kwargs)

            # if timer is None, need to start it
            if self.timer is None:
                self.timer = thr = Thread(target=self.__tkill)
                thr.daemon = True
                thr.start()

            result = func(self, *args, **kwargs)
            self.done.set()

            return result
        return wrapper

    wait = expirable(Popen.wait)
    communicate = expirable(Popen.communicate)


if __name__ == '__main__':
    from subprocess import PIPE

    print ExpirablePopen('ssh -T git@bitbucket.org', stdout=PIPE, timeout=1).communicate()
于 2016-12-19T12:18:44.547 回答
1

我有一个问题,如果多线程子进程花费的时间超过给定的超时长度,我想终止它。我想在 中设置超时Popen(),但它不起作用。然后,我意识到Popen().wait()等于call(),所以我有了在.wait(timeout=xxx)方法中设置超时的想法,最终奏效了。因此,我以这种方式解决了它:

import os
import sys
import signal
import subprocess
from multiprocessing import Pool

cores_for_parallelization = 4
timeout_time = 15  # seconds

def main():
    jobs = [...YOUR_JOB_LIST...]
    with Pool(cores_for_parallelization) as p:
        p.map(run_parallel_jobs, jobs)

def run_parallel_jobs(args):
    # Define the arguments including the paths
    initial_terminal_command = 'C:\\Python34\\python.exe'  # Python executable
    function_to_start = 'C:\\temp\\xyz.py'  # The multithreading script
    final_list = [initial_terminal_command, function_to_start]
    final_list.extend(args)

    # Start the subprocess and determine the process PID
    subp = subprocess.Popen(final_list)  # starts the process
    pid = subp.pid

    # Wait until the return code returns from the function by considering the timeout. 
    # If not, terminate the process.
    try:
        returncode = subp.wait(timeout=timeout_time)  # should be zero if accomplished
    except subprocess.TimeoutExpired:
        # Distinguish between Linux and Windows and terminate the process if 
        # the timeout has been expired
        if sys.platform == 'linux2':
            os.kill(pid, signal.SIGTERM)
        elif sys.platform == 'win32':
            subp.terminate()

if __name__ == '__main__':
    main()
于 2017-11-19T12:37:09.433 回答
1

迟到的答案Linux仅适用于,但如果有人想使用subprocess.getstatusoutput(),超时参数不可用,您可以在命令开头使用内置的 Linux 超时,即:

import subprocess

timeout = 25 # seconds
cmd = f"timeout --preserve-status --foreground {timeout} ping duckgo.com"
exit_c, out = subprocess.getstatusoutput(cmd)

if (exit_c == 0):
    print("success")
else:
    print("Error: ", out)

timeout论据:

于 2021-04-07T15:17:53.950 回答
1

此解决方案在 shell=True 的情况下终止进程树,将参数传递给进程(或不传递),超时并获取回调的 stdout、stderr 和进程输出(它使用 psutil 作为 kill_proc_tree)。这是基于 SO 中发布的几个解决方案,包括 jcollado 的。在 jcollado 的回答中回复 Anson 和 jradice 的评论。在 Windows Srvr 2012 和 Ubuntu 14.04 中测试。请注意,对于 Ubuntu,您需要将 parent.children(...) 调用更改为 parent.get_children(...)。

def kill_proc_tree(pid, including_parent=True):
  parent = psutil.Process(pid)
  children = parent.children(recursive=True)
  for child in children:
    child.kill()
  psutil.wait_procs(children, timeout=5)
  if including_parent:
    parent.kill()
    parent.wait(5)

def run_with_timeout(cmd, current_dir, cmd_parms, timeout):
  def target():
    process = subprocess.Popen(cmd, cwd=current_dir, shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.PIPE)

    # wait for the process to terminate
    if (cmd_parms == ""):
      out, err = process.communicate()
    else:
      out, err = process.communicate(cmd_parms)
    errcode = process.returncode

  thread = Thread(target=target)
  thread.start()

  thread.join(timeout)
  if thread.is_alive():
    me = os.getpid()
    kill_proc_tree(me, including_parent=False)
    thread.join()
于 2016-08-23T19:04:00.883 回答
0

不幸的是,我受到雇主披露源代码的严格政策的约束,因此我无法提供实际代码。但就我的口味而言,最好的解决方案是创建一个覆盖Popen.wait()以轮询而不是无限期等待的子类,并Popen.__init__接受超时参数。一旦你这样做了,所有其他Popen方法(调用wait)将按预期工作,包括communicate.

于 2012-10-01T17:17:49.873 回答
0

对于 python 2.6+,使用 gevent

 from gevent.subprocess import Popen, PIPE, STDOUT

 def call_sys(cmd, timeout):
      p= Popen(cmd, shell=True, stdout=PIPE)
      output, _ = p.communicate(timeout=timeout)
      assert p.returncode == 0, p. returncode
      return output

 call_sys('./t.sh', 2)

 # t.sh example
 sleep 5
 echo done
 exit 1
于 2018-11-01T12:15:03.707 回答
0

https://pypi.python.org/pypi/python-subprocess2为 subprocess 模块提供了扩展,允许您等待一段时间,否则终止。

因此,等待进程终止最多 10 秒,否则终止:

pipe  = subprocess.Popen('...')

timeout =  10

results = pipe.waitOrTerminate(timeout)

这与 windows 和 unix 都兼容。“results”是一个字典,它包含“returnCode”,它是应用程序的返回(如果必须被杀死,则为 None),以及“actionTaken”。如果进程正常完成,这将是“SUBPROCESS2_PROCESS_COMPLETED”,或者是“SUBPROCESS2_PROCESS_TERMINATED”和 SUBPROCESS2_PROCESS_KILLED 的掩码,具体取决于所采取的操作(有关完整详细信息,请参阅文档)

于 2015-09-25T13:45:41.427 回答
-3

只是想写一些更简单的东西。

#!/usr/bin/python

from subprocess import Popen, PIPE
import datetime
import time 

popen = Popen(["/bin/sleep", "10"]);
pid = popen.pid
sttime = time.time();
waittime =  3

print "Start time %s"%(sttime)

while True:
    popen.poll();
    time.sleep(1)
    rcode = popen.returncode
    now = time.time();
    if [ rcode is None ]  and  [ now > (sttime + waittime) ] :
        print "Killing it now"
        popen.kill()
于 2014-04-05T21:01:52.543 回答