对于调试,判断一个特定函数是否在调用堆栈上更高通常很有用。例如,我们经常只想在某个函数调用我们时运行调试代码。
一种解决方案是检查所有更高的堆栈条目,但这是在堆栈深处并重复调用的函数中,这会导致过多的开销。问题是找到一种方法,使我们能够以合理有效的方式确定特定函数是否在调用堆栈上更高。
相似的
- 从框架对象获取对执行堆栈上的函数对象的引用?- 这个问题的重点是获取函数对象,而不是确定我们是否在一个特定的函数中。尽管可以应用相同的技术,但它们最终可能效率极低。
对于调试,判断一个特定函数是否在调用堆栈上更高通常很有用。例如,我们经常只想在某个函数调用我们时运行调试代码。
一种解决方案是检查所有更高的堆栈条目,但这是在堆栈深处并重复调用的函数中,这会导致过多的开销。问题是找到一种方法,使我们能够以合理有效的方式确定特定函数是否在调用堆栈上更高。
相似的
除非您的目标函数做了一些非常特别的事情来标记“我的一个实例在堆栈上处于活动状态”(IOW:如果该函数是原始且不可触碰的,并且不可能意识到您的这种特殊需求) , 没有可以想象的替代方法可以逐帧遍历堆栈,直到您到达顶部(并且该函数不存在)或您感兴趣的函数的堆栈帧。正如对该问题的一些评论所表明的那样,是否值得努力优化这一点非常值得怀疑。但是,假设为了争论是值得的......:
编辑:最初的答案(由 OP 提供)有很多缺陷,但有些已经修复,所以我正在编辑以反映当前情况以及为什么某些方面很重要。
首先,在装饰器中使用try
/except
或是至关重要with
的,以便正确考虑从被监视函数的任何退出,而不仅仅是正常退出(正如 OP 自己答案的原始版本所做的那样)。
其次,每个装饰者都应该确保它保持被装饰函数的完整__name__
和__doc__
完整——这就是functools.wraps
目的(还有其他方法,但wraps
让它变得最简单)。
第三,与第一点一样重要的是set
,最初由 OP 选择的数据结构 a 是错误的选择:一个函数可以在堆栈上多次(直接或间接递归)。我们显然需要一个“多集”(也称为“袋子”),一种类似集的结构,可以跟踪每个项目出现的“次数”。在 Python 中,多重集的自然实现是作为 dict 将键映射到计数,而后者又最容易实现为collections.defaultdict(int)
.
第四,一般的方法应该是线程安全的(至少当这可以很容易地实现时;-)。幸运的是threading.local
,在适用的情况下,它变得微不足道——在这里,它肯定应该是(每个堆栈都有自己独立的调用线程)。
第五,在一些评论中提出了一个有趣的问题(注意在某些答案中提供的装饰器与其他装饰器的关系有多么糟糕:监控装饰器似乎必须是最后一个(最外面的)装饰器,否则检查会中断。这来自使用函数对象本身作为监控字典的键的自然但不幸的选择。
我建议通过选择不同的键来解决这个问题:让装饰器采用一个(例如,字符串)identifier
参数,该参数必须是唯一的(在每个给定线程中),并使用标识符作为监控字典的键。检查堆栈的代码当然必须知道标识符并使用它。
在装饰时,装饰器可以检查唯一性属性(通过使用单独的集合)。标识符可以保留为函数名的默认值(因此仅明确要求保持在同一命名空间中监视同名函数的灵活性);当出于监视目的而将多个受监视功能视为“相同”时,可以明确放弃唯一性属性(如果给定的可能是这种情况)def
语句意味着在稍微不同的上下文中多次执行,以生成程序员想要将其视为“相同函数”以用于监视目的的多个函数对象)。最后,对于那些已知不可能进一步装饰的罕见情况(因为在这些情况下,这可能是保证唯一性的最简便方法),应该可以选择性地恢复为“作为标识符的函数对象”。
因此,将这些考虑因素放在一起,我们可以拥有(包括一个threadlocal_var
可能已经在工具箱模块中的实用程序函数;-)类似于以下内容......:
import collections
import functools
import threading
threadlocal = threading.local()
def threadlocal_var(varname, factory, *a, **k):
v = getattr(threadlocal, varname, None)
if v is None:
v = factory(*a, **k)
setattr(threadlocal, varname, v)
return v
def monitoring(identifier=None, unique=True, use_function=False):
def inner(f):
assert (not use_function) or (identifier is None)
if identifier is None:
if use_function:
identifier = f
else:
identifier = f.__name__
if unique:
monitored = threadlocal_var('uniques', set)
if identifier in monitored:
raise ValueError('Duplicate monitoring identifier %r' % identifier)
monitored.add(identifier)
counts = threadlocal_var('counts', collections.defaultdict, int)
@functools.wraps(f)
def wrapper(*a, **k):
counts[identifier] += 1
try:
return f(*a, **k)
finally:
counts[identifier] -= 1
return wrapper
return inner
我没有测试过这段代码,所以它可能包含一些错字或类似的东西,但我提供它是因为我希望它确实涵盖了我上面解释的所有重要技术点。
这一切都值得吗?可能不是,如前所述。然而,我认为“如果它值得做,那么它值得做对”;-)。
我不太喜欢这种方法,但这是您正在做的固定版本:
from collections import defaultdict
import threading
functions_on_stack = threading.local()
def record_function_on_stack(f):
def wrapped(*args, **kwargs):
if not getattr(functions_on_stack, "stacks", None):
functions_on_stack.stacks = defaultdict(int)
functions_on_stack.stacks[wrapped] += 1
try:
result = f(*args, **kwargs)
finally:
functions_on_stack.stacks[wrapped] -= 1
if functions_on_stack.stacks[wrapped] == 0:
del functions_on_stack.stacks[wrapped]
return result
wrapped.orig_func = f
return wrapped
def function_is_on_stack(f):
return f in functions_on_stack.stacks
def nested():
if function_is_on_stack(test):
print "nested"
@record_function_on_stack
def test():
nested()
test()
这处理递归、线程和异常。
我不喜欢这种方法有两个原因:
更好的方法是直接检查堆栈(可能作为速度的本机扩展),如果可能,找到一种方法在堆栈帧的生命周期内缓存结果。(不过,我不确定在不修改 Python 核心的情况下是否可行。)