3

我创建了一个简单的服务器拦截器,它根据 JWT 令牌检索用户。

但现在我想让它可用于我服务的所有方法。

目前我正在使用装饰器。但我想避免不得不装饰所有的方法。以防万一,只装饰那些不需要用户的。

有人可以给我一个线索吗?

这是我的代码:

class AuthInterceptor(grpc.ServerInterceptor):
"""Authorization Interceptor"""

def __init__(self, loader):
    self._loader = loader

def intercept_service(self, continuation, handler_call_details):
    # Authenticate if we not requesting token.
    if not handler_call_details.method.endswith('GetToken'):
        # My Authentication class.
        auth = EosJWTAuth()
        # Authenticate using the headers tokens to get the user.
        user = auth.authenticate(
            dict(handler_call_details.invocation_metadata))[0]

        # Do something here to pass the authenticated user to the functions.

    cont = continuation(handler_call_details)
    return cont

而且我希望我的方法能够以这样的方式访问用户。

class UserService(BaseService, users_pb2_grpc.UserServicer):
"""User service."""

def get_age(self, request, context):
    """Get user's age"""
    user = context.get_user()
    # or user = context.user 
    # or user = self.user 
    # os user = request.get_user() 
    return pb.UserInfo(name=user.name, age=user.age)
4

1 回答 1

2

这是 Web 服务器的常见需求,最好将装饰器添加到处理程序以显式设置身份验证/授权要求。它有助于提高可读性,并降低整体复杂性。

但是,这是解决您的问题的解决方法。它使用 Python 元类来自动装饰每个服务方法。

import grpc
import functools
import six

def auth_decorator(func):
    @functools.wraps(func)
    def wrapper(request, context):
        if not func.__name__.endswith('GetToken'):
            auth = FooAuthClass()
            try:
                user = auth.authenticate(
                    dict(context.invocation_metadata)
                )[0]
                request.user = user
            except UserNotFound:
                context.abort(
                    grpc.StatusCode.UNAUTHENTICATED,
                    'Permission denied.',
                )
        return func(request, context)

    return wrapper

class AuthMeta:

    def __new__(self, class_name, bases, namespace):
        for key, value in list(namespace.items()):
            if callable(value):
                namespace[key] = auth_decorator(value)
        return type.__new__(self, class_name, bases, namespace)

class BusinessServer(FooServicer, six.with_metaclass(AuthMeta)):

    def LogicA(self, request, context):
        # request.user accessible
        ...

    def LogicGetToken(self, request, context):
        # request.user not accessible
        ...
于 2019-02-27T19:22:30.763 回答