我在 Python 中调用一个函数,我知道它可能会停止并迫使我重新启动脚本。
如何调用该函数或将其包装在什么中,以便如果它花费超过 5 秒的时间,脚本将取消它并执行其他操作?
我在 Python 中调用一个函数,我知道它可能会停止并迫使我重新启动脚本。
如何调用该函数或将其包装在什么中,以便如果它花费超过 5 秒的时间,脚本将取消它并执行其他操作?
如果您在 UNIX 上运行,则可以使用信号包:
In [1]: import signal
# Register an handler for the timeout
In [2]: def handler(signum, frame):
...: print("Forever is over!")
...: raise Exception("end of time")
...:
# This function *may* run for an indetermined time...
In [3]: def loop_forever():
...: import time
...: while 1:
...: print("sec")
...: time.sleep(1)
...:
...:
# Register the signal function handler
In [4]: signal.signal(signal.SIGALRM, handler)
Out[4]: 0
# Define a timeout for your function
In [5]: signal.alarm(10)
Out[5]: 0
In [6]: try:
...: loop_forever()
...: except Exception, exc:
...: print(exc)
....:
sec
sec
sec
sec
sec
sec
sec
sec
Forever is over!
end of time
# Cancel the timer if the function returned before timeout
# (ok, mine won't but yours maybe will :)
In [7]: signal.alarm(0)
Out[7]: 0
调用后 10 秒signal.alarm(10)
,处理程序被调用。这会引发一个异常,您可以从常规 Python 代码中截取该异常。
这个模块不能很好地使用线程(但是,谁呢?)
请注意,由于我们在超时发生时引发异常,它最终可能会在函数内部被捕获并忽略,例如一个这样的函数:
def loop_forever():
while 1:
print('sec')
try:
time.sleep(10)
except:
continue
你可以用它multiprocessing.Process
来做到这一点。
代码
import multiprocessing
import time
# bar
def bar():
for i in range(100):
print "Tick"
time.sleep(1)
if __name__ == '__main__':
# Start bar as a process
p = multiprocessing.Process(target=bar)
p.start()
# Wait for 10 seconds or until process finishes
p.join(10)
# If thread is still active
if p.is_alive():
print "running... let's kill it..."
# Terminate - may not work if process is stuck for good
p.terminate()
# OR Kill - will work for sure, no chance for process to finish nicely however
# p.kill()
p.join()
如何调用该函数或将其包装在什么中,以便如果它花费的时间超过 5 秒,脚本会取消它?
我发布了一个要点,用一个装饰器和一个threading.Timer
. 这是一个故障。
它使用 Python 2 和 3 进行了测试。它也应该在 Unix/Linux 和 Windows 下工作。
首先是进口。无论 Python 版本如何,这些都试图保持代码一致:
from __future__ import print_function
import sys
import threading
from time import sleep
try:
import thread
except ImportError:
import _thread as thread
使用与版本无关的代码:
try:
range, _print = xrange, print
def print(*args, **kwargs):
flush = kwargs.pop('flush', False)
_print(*args, **kwargs)
if flush:
kwargs.get('file', sys.stdout).flush()
except NameError:
pass
现在我们已经从标准库中导入了我们的功能。
exit_after
装饰师接下来我们需要一个函数来终止main()
子线程:
def quit_function(fn_name):
# print to stderr, unbuffered in Python 2.
print('{0} took too long'.format(fn_name), file=sys.stderr)
sys.stderr.flush() # Python 3 stderr is likely buffered.
thread.interrupt_main() # raises KeyboardInterrupt
这是装饰器本身:
def exit_after(s):
'''
use as decorator to exit process if
function takes longer than s seconds
'''
def outer(fn):
def inner(*args, **kwargs):
timer = threading.Timer(s, quit_function, args=[fn.__name__])
timer.start()
try:
result = fn(*args, **kwargs)
finally:
timer.cancel()
return result
return inner
return outer
这是直接回答您关于 5 秒后退出的问题的用法!:
@exit_after(5)
def countdown(n):
print('countdown started', flush=True)
for i in range(n, -1, -1):
print(i, end=', ', flush=True)
sleep(1)
print('countdown finished')
演示:
>>> countdown(3)
countdown started
3, 2, 1, 0, countdown finished
>>> countdown(10)
countdown started
10, 9, 8, 7, 6, countdown took too long
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 11, in inner
File "<stdin>", line 6, in countdown
KeyboardInterrupt
第二个函数调用不会完成,而是进程应该退出并回溯!
KeyboardInterrupt
并不总是停止睡眠线程请注意,在 Windows 上的 Python 2 上,键盘中断并不总是会中断睡眠,例如:
@exit_after(1)
def sleep10():
sleep(10)
print('slept 10 seconds')
>>> sleep10()
sleep10 took too long # Note that it hangs here about 9 more seconds
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 11, in inner
File "<stdin>", line 3, in sleep10
KeyboardInterrupt
除非明确检查,否则它也不太可能中断在扩展中运行的代码PyErr_CheckSignals()
,请参阅 Cython、Python 和 KeyboardInterrupt 被忽略
在任何情况下,我都会避免让线程休眠超过一秒 - 这是处理器时间的一个永恒。
如何调用该函数或将其包装在什么中,以便如果它花费超过 5 秒的时间,脚本将取消它并执行其他操作?
要捕获它并执行其他操作,您可以捕获 KeyboardInterrupt。
>>> try:
... countdown(10)
... except KeyboardInterrupt:
... print('do something else')
...
countdown started
10, 9, 8, 7, 6, countdown took too long
do something else
我有一个不同的建议,它是一个纯函数(与线程建议具有相同的 API)并且似乎工作正常(基于这个线程的建议)
def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
import signal
class TimeoutError(Exception):
pass
def handler(signum, frame):
raise TimeoutError()
# set the timeout handler
signal.signal(signal.SIGALRM, handler)
signal.alarm(timeout_duration)
try:
result = func(*args, **kwargs)
except TimeoutError as exc:
result = default
finally:
signal.alarm(0)
return result
在单元测试中搜索超时调用时,我遇到了这个线程。我在答案或 3rd 方包中没有找到任何简单的东西,所以我在下面编写了装饰器,您可以直接放入代码中:
import multiprocessing.pool
import functools
def timeout(max_timeout):
"""Timeout decorator, parameter in seconds."""
def timeout_decorator(item):
"""Wrap the original function."""
@functools.wraps(item)
def func_wrapper(*args, **kwargs):
"""Closure for function."""
pool = multiprocessing.pool.ThreadPool(processes=1)
async_result = pool.apply_async(item, args, kwargs)
# raises a TimeoutError if execution exceeds max_timeout
return async_result.get(max_timeout)
return func_wrapper
return timeout_decorator
然后就像这样简单地使测试或您喜欢的任何功能超时:
@timeout(5.0) # if execution takes longer than 5 seconds, raise a TimeoutError
def test_base_regression(self):
...
在 pypi 上找到的stopit
包似乎可以很好地处理超时。
我喜欢@stopit.threading_timeoutable
装饰器,它timeout
为被装饰的函数添加了一个参数,它可以满足你的期望,它会停止函数。
在 pypi 上查看:https ://pypi.python.org/pypi/stopit
有很多建议,但没有一个使用 concurrent.futures,我认为这是处理这个问题的最清晰的方法。
from concurrent.futures import ProcessPoolExecutor
# Warning: this does not terminate function if timeout
def timeout_five(fnc, *args, **kwargs):
with ProcessPoolExecutor() as p:
f = p.submit(fnc, *args, **kwargs)
return f.result(timeout=5)
超级简单的阅读和维护。
我们创建一个池,提交一个进程,然后等待最多 5 秒,然后引发 TimeoutError,您可以根据需要捕获和处理该错误。
原生于 python 3.2+ 并向后移植到 2.7(pip install futures)。
在线程和进程之间切换就像替换ProcessPoolExecutor
为ThreadPoolExecutor
.
如果您想在超时时终止进程,我建议您查看Pebble。
伟大的,易于使用和可靠的PyPi项目超时装饰器(https://pypi.org/project/timeout-decorator/)
安装:
pip install timeout-decorator
用法:
import time
import timeout_decorator
@timeout_decorator.timeout(5)
def mytest():
print "Start"
for i in range(1,10):
time.sleep(1)
print "%d seconds have passed" % i
if __name__ == '__main__':
mytest()
我是wrapt_timeout_decorator的作者
乍一看,这里介绍的大多数解决方案在 Linux 下都可以正常工作——因为我们有 fork() 和 signals()——但在 Windows 上看起来有点不同。当涉及到 Linux 上的子线程时,你不能再使用信号了。
为了在 Windows 下生成一个进程,它需要是可挑选的——许多装饰函数或类方法不是。
所以你需要使用更好的pickler,比如dill和multiprocess(不是pickle和multiprocessing)——这就是为什么你不能使用ProcessPoolExecutor(或者只有有限的功能)。
对于超时本身 - 您需要定义超时的含义 - 因为在 Windows 上生成进程需要相当长的时间(并且无法确定)。这对于短暂的超时可能会很棘手。让我们假设,产生这个过程大约需要 0.5 秒(很容易!!!)。如果你给 0.2 秒的超时应该发生什么?函数是否应该在 0.5 + 0.2 秒后超时(所以让方法运行 0.2 秒)?或者被调用的进程是否应该在 0.2 秒后超时(在这种情况下,装饰函数将始终超时,因为在那个时候它甚至没有产生)?
嵌套装饰器也可能很讨厌,您不能在子线程中使用信号。如果你想创建一个真正通用的、跨平台的装饰器,所有这些都需要考虑(和测试)。
其他问题是将异常传递回调用者,以及日志记录问题(如果在修饰函数中使用 - 不支持记录到另一个进程中的文件)
我试图涵盖所有边缘情况,您可能会查看包 wrapt_timeout_decorator,或者至少测试您自己的解决方案,灵感来自那里使用的单元测试。
@Alexis Eggermont - 不幸的是我没有足够的评论点 - 也许其他人可以通知你 - 我想我解决了你的导入问题。
在@piro 的基础上构建和增强答案,您可以构建一个上下文管理器。这允许非常易读的代码在成功运行后禁用警报信号(设置 signal.alarm(0))
from contextlib import contextmanager
import signal
import time
@contextmanager
def timeout(duration):
def timeout_handler(signum, frame):
raise Exception(f'block timedout after {duration} seconds')
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(duration)
yield
signal.alarm(0)
def sleeper(duration):
time.sleep(duration)
print('finished')
示例用法:
In [19]: with timeout(2):
...: sleeper(1)
...:
finished
In [20]: with timeout(2):
...: sleeper(3)
...:
---------------------------------------------------------------------------
Exception Traceback (most recent call last)
<ipython-input-20-66c78858116f> in <module>()
1 with timeout(2):
----> 2 sleeper(3)
3
<ipython-input-7-a75b966bf7ac> in sleeper(t)
1 def sleeper(t):
----> 2 time.sleep(t)
3 print('finished')
4
<ipython-input-18-533b9e684466> in timeout_handler(signum, frame)
2 def timeout(duration):
3 def timeout_handler(signum, frame):
----> 4 raise Exception(f'block timedout after {duration} seconds')
5 signal.signal(signal.SIGALRM, timeout_handler)
6 signal.alarm(duration)
Exception: block timedout after 2 seconds
timeout-decorator
不能在windows系统上工作,因为windows支持signal
不好。
如果你在 windows 系统中使用 timeout-decorator 你会得到以下
AttributeError: module 'signal' has no attribute 'SIGALRM'
有些人建议使用use_signals=False
,但对我没有用。
作者@bitranox 创建了以下包:
pip install https://github.com/bitranox/wrapt-timeout-decorator/archive/master.zip
代码示例:
import time
from wrapt_timeout_decorator import *
@timeout(5)
def mytest(message):
print(message)
for i in range(1,10):
time.sleep(1)
print('{} seconds have passed'.format(i))
def main():
mytest('starting')
if __name__ == '__main__':
main()
给出以下异常:
TimeoutError: Function mytest timed out after 5 seconds
TimeoutError
使用异常来提醒超时 - 可以轻松修改有关并行映射的完整解释和扩展,请参见此处https://flipdazed.github.io/blog/quant%20dev/parallel-functions-with-timeouts
>>> @killer_call(timeout=4)
... def bar(x):
... import time
... time.sleep(x)
... return x
>>> bar(10)
Traceback (most recent call last):
...
__main__.TimeoutError: function 'bar' timed out after 4s
正如预期的那样
>>> bar(2)
2
import multiprocessing as mp
import multiprocessing.queues as mpq
import functools
import dill
from typing import Tuple, Callable, Dict, Optional, Iterable, List, Any
class TimeoutError(Exception):
def __init__(self, func: Callable, timeout: int):
self.t = timeout
self.fname = func.__name__
def __str__(self):
return f"function '{self.fname}' timed out after {self.t}s"
def _lemmiwinks(func: Callable, args: Tuple, kwargs: Dict[str, Any], q: mp.Queue):
"""lemmiwinks crawls into the unknown"""
q.put(dill.loads(func)(*args, **kwargs))
def killer_call(func: Callable = None, timeout: int = 10) -> Callable:
"""
Single function call with a timeout
Args:
func: the function
timeout: The timeout in seconds
"""
if not isinstance(timeout, int):
raise ValueError(f'timeout needs to be an int. Got: {timeout}')
if func is None:
return functools.partial(killer_call, timeout=timeout)
@functools.wraps(killer_call)
def _inners(*args, **kwargs) -> Any:
q_worker = mp.Queue()
proc = mp.Process(target=_lemmiwinks, args=(dill.dumps(func), args, kwargs, q_worker))
proc.start()
try:
return q_worker.get(timeout=timeout)
except mpq.Empty:
raise TimeoutError(func, timeout)
finally:
try:
proc.terminate()
except:
pass
return _inners
if __name__ == '__main__':
@killer_call(timeout=4)
def bar(x):
import time
time.sleep(x)
return x
print(bar(2))
bar(10)
由于工作方式,您将需要在函数内部导入dill
。
doctest
这也意味着如果您的目标函数中有导入,这些函数可能不兼容。您将遇到__import__
未找到的问题。
我们可以使用相同的信号。我认为下面的示例对您有用。与线程相比,它非常简单。
import signal
def timeout(signum, frame):
raise myException
#this is an infinite loop, never ending under normal circumstances
def main():
print 'Starting Main ',
while 1:
print 'in main ',
#SIGALRM is only usable on a unix platform
signal.signal(signal.SIGALRM, timeout)
#change 5 to however many seconds you need
signal.alarm(5)
try:
main()
except myException:
print "whoops"
另一个使用 asyncio 的解决方案:
如果您想取消后台任务而不仅仅是在运行的主代码上超时,那么您需要来自主线程的显式通信以要求任务的代码取消,例如 threading.Event()
import asyncio
import functools
import multiprocessing
from concurrent.futures.thread import ThreadPoolExecutor
class SingletonTimeOut:
pool = None
@classmethod
def run(cls, to_run: functools.partial, timeout: float):
pool = cls.get_pool()
loop = cls.get_loop()
try:
task = loop.run_in_executor(pool, to_run)
return loop.run_until_complete(asyncio.wait_for(task, timeout=timeout))
except asyncio.TimeoutError as e:
error_type = type(e).__name__ #TODO
raise e
@classmethod
def get_pool(cls):
if cls.pool is None:
cls.pool = ThreadPoolExecutor(multiprocessing.cpu_count())
return cls.pool
@classmethod
def get_loop(cls):
try:
return asyncio.get_event_loop()
except RuntimeError:
asyncio.set_event_loop(asyncio.new_event_loop())
# print("NEW LOOP" + str(threading.current_thread().ident))
return asyncio.get_event_loop()
# ---------------
TIME_OUT = float('0.2') # seconds
def toto(input_items,nb_predictions):
return 1
to_run = functools.partial(toto,
input_items=1,
nb_predictions="a")
results = SingletonTimeOut.run(to_run, TIME_OUT)
以防万一它对任何人都有帮助,基于@piro 的答案,我制作了一个函数装饰器:
import time
import signal
from functools import wraps
def timeout(timeout_secs: int):
def wrapper(func):
@wraps(func)
def time_limited(*args, **kwargs):
# Register an handler for the timeout
def handler(signum, frame):
raise Exception(f"Timeout for function '{func.__name__}'")
# Register the signal function handler
signal.signal(signal.SIGALRM, handler)
# Define a timeout for your function
signal.alarm(timeout_secs)
result = None
try:
result = func(*args, **kwargs)
except Exception as exc:
raise exc
finally:
# disable the signal alarm
signal.alarm(0)
return result
return time_limited
return wrapper
在具有20 seconds
超时的函数上使用包装器看起来像:
@timeout(20)
def my_slow_or_never_ending_function(name):
while True:
time.sleep(1)
print(f"Yet another second passed {name}...")
try:
results = my_slow_or_never_ending_function("Yooo!")
except Exception as e:
print(f"ERROR: {e}")
#!/usr/bin/python2
import sys, subprocess, threading
proc = subprocess.Popen(sys.argv[2:])
timer = threading.Timer(float(sys.argv[1]), proc.terminate)
timer.start()
proc.wait()
timer.cancel()
exit(proc.returncode)
我需要不会被 time.sleep 阻塞的可嵌套定时中断(SIGALARM 无法做到)(这是基于线程的方法无法做到的)。我最终从这里复制并稍微修改了代码:http: //code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/
代码本身:
#!/usr/bin/python
# lightly modified version of http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/
"""alarm.py: Permits multiple SIGALRM events to be queued.
Uses a `heapq` to store the objects to be called when an alarm signal is
raised, so that the next alarm is always at the top of the heap.
"""
import heapq
import signal
from time import time
__version__ = '$Revision: 2539 $'.split()[1]
alarmlist = []
__new_alarm = lambda t, f, a, k: (t + time(), f, a, k)
__next_alarm = lambda: int(round(alarmlist[0][0] - time())) if alarmlist else None
__set_alarm = lambda: signal.alarm(max(__next_alarm(), 1))
class TimeoutError(Exception):
def __init__(self, message, id_=None):
self.message = message
self.id_ = id_
class Timeout:
''' id_ allows for nested timeouts. '''
def __init__(self, id_=None, seconds=1, error_message='Timeout'):
self.seconds = seconds
self.error_message = error_message
self.id_ = id_
def handle_timeout(self):
raise TimeoutError(self.error_message, self.id_)
def __enter__(self):
self.this_alarm = alarm(self.seconds, self.handle_timeout)
def __exit__(self, type, value, traceback):
try:
cancel(self.this_alarm)
except ValueError:
pass
def __clear_alarm():
"""Clear an existing alarm.
If the alarm signal was set to a callable other than our own, queue the
previous alarm settings.
"""
oldsec = signal.alarm(0)
oldfunc = signal.signal(signal.SIGALRM, __alarm_handler)
if oldsec > 0 and oldfunc != __alarm_handler:
heapq.heappush(alarmlist, (__new_alarm(oldsec, oldfunc, [], {})))
def __alarm_handler(*zargs):
"""Handle an alarm by calling any due heap entries and resetting the alarm.
Note that multiple heap entries might get called, especially if calling an
entry takes a lot of time.
"""
try:
nextt = __next_alarm()
while nextt is not None and nextt <= 0:
(tm, func, args, keys) = heapq.heappop(alarmlist)
func(*args, **keys)
nextt = __next_alarm()
finally:
if alarmlist: __set_alarm()
def alarm(sec, func, *args, **keys):
"""Set an alarm.
When the alarm is raised in `sec` seconds, the handler will call `func`,
passing `args` and `keys`. Return the heap entry (which is just a big
tuple), so that it can be cancelled by calling `cancel()`.
"""
__clear_alarm()
try:
newalarm = __new_alarm(sec, func, args, keys)
heapq.heappush(alarmlist, newalarm)
return newalarm
finally:
__set_alarm()
def cancel(alarm):
"""Cancel an alarm by passing the heap entry returned by `alarm()`.
It is an error to try to cancel an alarm which has already occurred.
"""
__clear_alarm()
try:
alarmlist.remove(alarm)
heapq.heapify(alarmlist)
finally:
if alarmlist: __set_alarm()
和一个用法示例:
import alarm
from time import sleep
try:
with alarm.Timeout(id_='a', seconds=5):
try:
with alarm.Timeout(id_='b', seconds=2):
sleep(3)
except alarm.TimeoutError as e:
print 'raised', e.id_
sleep(30)
except alarm.TimeoutError as e:
print 'raised', e.id_
else:
print 'nope.'
我遇到了同样的问题,但我的情况是需要在子线程上工作,信号对我不起作用,所以我写了一个 python 包: timeout-timer 来解决这个问题,支持用作上下文或装饰器,使用信号或子线程模块触发超时中断:
from timeout_timer import timeout, TimeoutInterrupt
class TimeoutInterruptNested(TimeoutInterrupt):
pass
def test_timeout_nested_loop_both_timeout(timer="thread"):
cnt = 0
try:
with timeout(5, timer=timer):
try:
with timeout(2, timer=timer, exception=TimeoutInterruptNested):
sleep(2)
except TimeoutInterruptNested:
cnt += 1
time.sleep(10)
except TimeoutInterrupt:
cnt += 1
assert cnt == 2
这是对给定的基于线程的解决方案的轻微改进。
下面的代码支持异常:
def runFunctionCatchExceptions(func, *args, **kwargs):
try:
result = func(*args, **kwargs)
except Exception, message:
return ["exception", message]
return ["RESULT", result]
def runFunctionWithTimeout(func, args=(), kwargs={}, timeout_duration=10, default=None):
import threading
class InterruptableThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.result = default
def run(self):
self.result = runFunctionCatchExceptions(func, *args, **kwargs)
it = InterruptableThread()
it.start()
it.join(timeout_duration)
if it.isAlive():
return default
if it.result[0] == "exception":
raise it.result[1]
return it.result[1]
以 5 秒超时调用它:
result = timeout(remote_calculate, (myarg,), timeout_duration=5)
这是一个 POSIX 版本,它结合了许多以前的答案以提供以下功能:
这是代码和一些测试用例:
import threading
import signal
import os
import time
class TerminateExecution(Exception):
"""
Exception to indicate that execution has exceeded the preset running time.
"""
def quit_function(pid):
# Killing all subprocesses
os.setpgrp()
os.killpg(0, signal.SIGTERM)
# Killing the main thread
os.kill(pid, signal.SIGTERM)
def handle_term(signum, frame):
raise TerminateExecution()
def invoke_with_timeout(timeout, fn, *args, **kwargs):
# Setting a sigterm handler and initiating a timer
old_handler = signal.signal(signal.SIGTERM, handle_term)
timer = threading.Timer(timeout, quit_function, args=[os.getpid()])
terminate = False
# Executing the function
timer.start()
try:
result = fn(*args, **kwargs)
except TerminateExecution:
terminate = True
finally:
# Restoring original handler and cancel timer
signal.signal(signal.SIGTERM, old_handler)
timer.cancel()
if terminate:
raise BaseException("xxx")
return result
### Test cases
def countdown(n):
print('countdown started', flush=True)
for i in range(n, -1, -1):
print(i, end=', ', flush=True)
time.sleep(1)
print('countdown finished')
return 1337
def really_long_function():
time.sleep(10)
def really_long_function2():
os.system("sleep 787")
# Checking that we can run a function as expected.
assert invoke_with_timeout(3, countdown, 1) == 1337
# Testing various scenarios
t1 = time.time()
try:
print(invoke_with_timeout(1, countdown, 3))
assert(False)
except BaseException:
assert(time.time() - t1 < 1.1)
print("All good", time.time() - t1)
t1 = time.time()
try:
print(invoke_with_timeout(1, really_long_function2))
assert(False)
except BaseException:
assert(time.time() - t1 < 1.1)
print("All good", time.time() - t1)
t1 = time.time()
try:
print(invoke_with_timeout(1, really_long_function))
assert(False)
except BaseException:
assert(time.time() - t1 < 1.1)
print("All good", time.time() - t1)
# Checking that classes are referenced and not
# copied (as would be the case with multiprocessing)
class X:
def __init__(self):
self.value = 0
def set(self, v):
self.value = v
x = X()
invoke_with_timeout(2, x.set, 9)
assert x.value == 9