8

我试图捕获任何服务程序中引发的任何异常,因此我可以确保我只传播已知异常而不是像 ValueError、TypeError 等意外的异常。

我希望能够捕获任何引发的错误,并将它们格式化或将它们转换为其他错误,以更好地控制暴露的信息。

我不想用 try/except 封装每个服务方法。

我尝试过使用拦截器,但我无法捕捉到那里的错误。

有没有办法为 grpc 服务器指定错误处理程序?就像你对烧瓶或任何其他 http 服务器所做的那样?

4

3 回答 3

3

gRPC Python 当前不支持服务器端全局错误处理程序。拦截器不会在函数内部执行服务器处理程序intercept_service,因此无法尝试/除外。

此外,我发现 gRPC Python 服务器拦截器实现与他们在L13-Python-Interceptors.md#server-interceptors提出的原始实现不同。如果实现坚持原来的设计,我们可以很容易地使用拦截器作为全局错误处理程序handlerrequest/ request_iterator

# Current Implementation
intercept_service(self, continuation, handler_call_details)

# Original Design
intercept_unary_unary_handler(self, handler, method, request, servicer_context)
intercept_unary_stream_handler(self, handler, method, request, servicer_context)
intercept_stream_unary_handler(self, handler, method, request_iterator, servicer_context)
intercept_stream_stream_handler(self, handler, method, request_iterator, servicer_context)

请向https://github.com/grpc/grpc/issues提交功能请求问题。

于 2018-11-28T20:27:01.863 回答
2

也许这会对你有所帮助:)

def _wrap_rpc_behavior(handler, fn):
    if handler is None:
        return None

    if handler.request_streaming and handler.response_streaming:
        behavior_fn = handler.stream_stream
        handler_factory = grpc.stream_stream_rpc_method_handler
    elif handler.request_streaming and not handler.response_streaming:
        behavior_fn = handler.stream_unary
        handler_factory = grpc.stream_unary_rpc_method_handler
    elif not handler.request_streaming and handler.response_streaming:
        behavior_fn = handler.unary_stream
        handler_factory = grpc.unary_stream_rpc_method_handler
    else:
        behavior_fn = handler.unary_unary
        handler_factory = grpc.unary_unary_rpc_method_handler

    return handler_factory(fn(behavior_fn,
                              handler.request_streaming,
                              handler.response_streaming),
                           request_deserializer=handler.request_deserializer,
                           response_serializer=handler.response_serializer)


class TracebackLoggerInterceptor(grpc.ServerInterceptor):

    def intercept_service(self, continuation, handler_call_details):
        def latency_wrapper(behavior, request_streaming, response_streaming):

            def new_behavior(request_or_iterator, servicer_context):
                try:
                    return behavior(request_or_iterator, servicer_context)
                except Exception as err:
                    logger.exception(err, exc_info=True)
            return new_behavior

        return _wrap_rpc_behavior(continuation(handler_call_details),    latency_wrapper)
于 2020-01-15T15:50:15.077 回答
1

正如之前的一些评论所建议的那样,我尝试了非常有效的元类方法。

附上一个简单的例子来演示如何拦截grpc调用。您可以通过为元类提供可以应用于每个函数的装饰器列表来扩展它。

此外,明智的做法是对应用包装器的方法更具选择性。一个不错的选择是列出自动生成的基类的方法并只包装它们。

from types import FunctionType
from functools import wraps


def wrapper(method):
    @wraps(method)
    def wrapped(*args, **kwargs):
        # do stuff here
        return method(*args, **kwargs)

    return wrapped


class ServicerMiddlewareClass(type):
    def __new__(meta, classname, bases, class_dict):
        new_class_dict = {}

        for attribute_name, attribute in class_dict.items():
            if isinstance(attribute, FunctionType):
                # replace it with a wrapped version
                attribute = wrapper(attribute)

            new_class_dict[attribute_name] = attribute

        return type.__new__(meta, classname, bases, new_class_dict)


# In order to use
class MyGrpcService(grpc.MyGrpcServicer, metaclass=ServicerMiddlewareClass):
   ...
于 2020-01-08T15:35:27.050 回答