我试图捕获任何服务程序中引发的任何异常,因此我可以确保我只传播已知异常而不是像 ValueError、TypeError 等意外的异常。
我希望能够捕获任何引发的错误,并将它们格式化或将它们转换为其他错误,以更好地控制暴露的信息。
我不想用 try/except 封装每个服务方法。
我尝试过使用拦截器,但我无法捕捉到那里的错误。
有没有办法为 grpc 服务器指定错误处理程序?就像你对烧瓶或任何其他 http 服务器所做的那样?
我试图捕获任何服务程序中引发的任何异常,因此我可以确保我只传播已知异常而不是像 ValueError、TypeError 等意外的异常。
我希望能够捕获任何引发的错误,并将它们格式化或将它们转换为其他错误,以更好地控制暴露的信息。
我不想用 try/except 封装每个服务方法。
我尝试过使用拦截器,但我无法捕捉到那里的错误。
有没有办法为 grpc 服务器指定错误处理程序?就像你对烧瓶或任何其他 http 服务器所做的那样?
gRPC Python 当前不支持服务器端全局错误处理程序。拦截器不会在函数内部执行服务器处理程序intercept_service
,因此无法尝试/除外。
此外,我发现 gRPC Python 服务器拦截器实现与他们在L13-Python-Interceptors.md#server-interceptors提出的原始实现不同。如果实现坚持原来的设计,我们可以很容易地使用拦截器作为全局错误处理程序handler
和request
/ request_iterator
。
# Current Implementation
intercept_service(self, continuation, handler_call_details)
# Original Design
intercept_unary_unary_handler(self, handler, method, request, servicer_context)
intercept_unary_stream_handler(self, handler, method, request, servicer_context)
intercept_stream_unary_handler(self, handler, method, request_iterator, servicer_context)
intercept_stream_stream_handler(self, handler, method, request_iterator, servicer_context)
也许这会对你有所帮助:)
def _wrap_rpc_behavior(handler, fn):
if handler is None:
return None
if handler.request_streaming and handler.response_streaming:
behavior_fn = handler.stream_stream
handler_factory = grpc.stream_stream_rpc_method_handler
elif handler.request_streaming and not handler.response_streaming:
behavior_fn = handler.stream_unary
handler_factory = grpc.stream_unary_rpc_method_handler
elif not handler.request_streaming and handler.response_streaming:
behavior_fn = handler.unary_stream
handler_factory = grpc.unary_stream_rpc_method_handler
else:
behavior_fn = handler.unary_unary
handler_factory = grpc.unary_unary_rpc_method_handler
return handler_factory(fn(behavior_fn,
handler.request_streaming,
handler.response_streaming),
request_deserializer=handler.request_deserializer,
response_serializer=handler.response_serializer)
class TracebackLoggerInterceptor(grpc.ServerInterceptor):
def intercept_service(self, continuation, handler_call_details):
def latency_wrapper(behavior, request_streaming, response_streaming):
def new_behavior(request_or_iterator, servicer_context):
try:
return behavior(request_or_iterator, servicer_context)
except Exception as err:
logger.exception(err, exc_info=True)
return new_behavior
return _wrap_rpc_behavior(continuation(handler_call_details), latency_wrapper)
正如之前的一些评论所建议的那样,我尝试了非常有效的元类方法。
附上一个简单的例子来演示如何拦截grpc调用。您可以通过为元类提供可以应用于每个函数的装饰器列表来扩展它。
此外,明智的做法是对应用包装器的方法更具选择性。一个不错的选择是列出自动生成的基类的方法并只包装它们。
from types import FunctionType
from functools import wraps
def wrapper(method):
@wraps(method)
def wrapped(*args, **kwargs):
# do stuff here
return method(*args, **kwargs)
return wrapped
class ServicerMiddlewareClass(type):
def __new__(meta, classname, bases, class_dict):
new_class_dict = {}
for attribute_name, attribute in class_dict.items():
if isinstance(attribute, FunctionType):
# replace it with a wrapped version
attribute = wrapper(attribute)
new_class_dict[attribute_name] = attribute
return type.__new__(meta, classname, bases, new_class_dict)
# In order to use
class MyGrpcService(grpc.MyGrpcServicer, metaclass=ServicerMiddlewareClass):
...