我想在一个在子线程中调用的函数上添加超时。我不能使用信号,因为信号应该在主线程上。我不能使用thread.join(time_out)
,因为该函数有时可以在几秒钟内执行,在这种情况下,线程将始终等待time_out
.
还有其他方法吗?
资料来源:
thread.join
:在 python 中使用线程的超时功能不起作用signal
:超时功能,如果完成时间过长
我想在一个在子线程中调用的函数上添加超时。我不能使用信号,因为信号应该在主线程上。我不能使用thread.join(time_out)
,因为该函数有时可以在几秒钟内执行,在这种情况下,线程将始终等待time_out
.
还有其他方法吗?
资料来源:
thread.join
:在 python 中使用线程的超时功能不起作用signal
:超时功能,如果完成时间过长您可以使用鲜为人知的ctyps
技巧来提高TimeoutError
针对特定线程的目标。我non_blocking
用这个方法做了一个超时脚本,刚刚在GitHub上发布:https ://github.com/levimluke/PyTimeoutAfter
实现起来超级简单,但技术上很复杂。
def raise_caller(self):
ret = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(self._caller_thread._ident), ctypes.py_object(self._exception))
if ret == 0:
raise ValueError("Invalid thread ID")
elif ret > 1:
ctypes.pythonapi.PyThreadState_SetAsyncExc(self._caller_thread._ident, NULL)
raise SystemError("PyThreadState_SetAsyncExc failed")
https://github.com/levimluke/PyTimeoutAfter/blob/main/timeout_after.py#L47
我使用一个类对象并将调用线程保存到,这样我就可以从定时子线程中引发父类中的异常。