我希望能够使用django-guardian
.
但我想围绕这些权限添加一层逻辑。例如,如果某人edit_book
对 a 有权限Book
,那么他们对Pages
该书的编辑权限应该是隐式的。rules
包装似乎很理想。
我希望能够使用django-guardian
.
但我想围绕这些权限添加一层逻辑。例如,如果某人edit_book
对 a 有权限Book
,那么他们对Pages
该书的编辑权限应该是隐式的。rules
包装似乎很理想。
tl;dr是的,他们可以,而且我们可以解决 的一些可伸缩性问题Rules
,但是您无法绕过对每个对象运行查询,因此权限过滤查询的成本很高。底部建议了一个混合但更复杂的解决方案,通过在运行时将惰性规则集编译到 SQL 中,使用类似惰性Q
的对象来解决这个问题。
以下似乎有效:
import rules
import guardian
@rules.predicate
def is_page_book_editor(user, page):
return user.has_perm('books.edit_book', page.book)
@rules.predicate
def is_page_editor(user, page):
return user.has_perm('pages.edit_page', page)
rules.add_perm('pages.can_edit_page', is_page_book_editor | is_page_editor)
然后检查:
joe.has_perm('pages.can_edit_page', page34)
或者:
@permission_required('pages.can_edit_page', fn=objectgetter(Page, 'page_id'))
def post_update(request, page_id):
# ...
定义了身份验证后端:
AUTHENTICATION_BACKENDS = (
'rules.permissions.ObjectPermissionBackend',
'django.contrib.auth.backends.ModelBackend',
'guardian.backends.ObjectPermissionBackend',
)
进口:
from django.contrib.auth.models import User
import rules
import guardian
from guardian.shortcuts import assign_perm
from myapp.models import Book, Page
测试:
joe = User.objects.create(username='joe', email='joe@example.com')
page23 = Page.objects.filter(id=123)
assign_perm('edit_page', joe, page23)
joe.has_perm('edit_page', page23)
is_page_editor(joe, page23) # returns True
joe.has_perm('can_edit_page', i) # returns True
rules.remove_perm('can_edit_page')
rules.add_perm('can_edit_page', is_page_book_editor & is_page_editor)
joe.has_perm('can_edit_page', i) # returns False
这样做的一个问题是,每次检查规则时,每个谓词都会调用数据库。以下添加缓存,以便每个规则检查只有一个查询:
@rules.predicate
def is_page_book_viewer(user, instance):
if is_page_book_viewer.context.get('user_perms') is None:
is_page_book_viewer.context['user_perms'] = guardian.shortcuts.get_perms(user, page.book)
return 'view_book' in is_page_book_viewer.context.get('user_perms')
@rules.predicate(bind=True)
def is_page_viewer(self, user, instance):
if self.context.get('user_perms') is None:
self.context['user_perms'] = guardian.shortcuts.get_perms(user, instance)
return 'view_page' in self.context.get('user_perms')
(我在第二个示例中绑定并使用self
,但这与使用谓词名称相同。)
当您正在执行复杂的复合权限时,将 django-guardian 的通用外键替换为可以由数据库优化和索引的真实外键可能是明智的,如下所示:
class PageUserObjectPermission(UserObjectPermissionBase):
content_object = models.ForeignKey(Page)
class PageGroupObjectPermission(GroupObjectPermissionBase):
content_object = models.ForeignKey(Page)
class BookUserObjectPermission(UserObjectPermissionBase):
content_object = models.ForeignKey(Book)
class BookGroupObjectPermission(GroupObjectPermissionBase):
content_object = models.ForeignKey(Book)
有一个错误。我们在同一个地方缓存权限Page
——Book
我们需要分别区分和缓存它们。另外,让我们将重复的代码封装到它自己的方法中。最后,让我们get()
设置一个默认值,以确保我们不会在用户拥有None
.
def cache_permissions(predicate, user, instance):
"""
Cache all permissions this user has on this instance, for potential reuse by other predicates in this rule check.
"""
key = 'user_%s_perms_%s_%s' % (user.pk, type(instance).__name__, instance.pk)
if predicate.context.get(key, -1) == -1:
predicate.context[key] = guardian.shortcuts.get_perms(user, instance)
return predicate.context[key]
这样对象权限将被单独缓存。(包括用户 IDkey
是不必要的,因为任何规则都只会检查一个用户,但更具前瞻性。)
然后我们可以定义我们的谓词如下:
@rules.predicate(bind=True)
def is_page_book_viewer(self, user, instance: Page):
return 'view_book' in cache_permissions(self, user, instance.book)
一个限制rules
是必须根据用户单独进行权限检查,但我们通常必须获取用户具有给定权限的所有对象。例如,要获取用户拥有编辑权限的所有页面的列表,我需要重复调用[p for p in Pages.objects.all() if usr.has_perm('can_edit_page', p)]
,而不是usr.has_perm('can_edit_page')
在一个查询中返回所有允许的对象。
我们不能完全解决这个限制,但是在我们不需要检查列表中的每个对象的情况下,我们可以使用next
基于协程的惰性生成器查询集来减少查询的数量。在上面的例子中,我们可以使用(...)
而不是[...]
如果我们可能没有走到列表的末尾,并且next(...)
我们只需要检查列表中的任何对象是否具有权限。break
或者return
将是正常循环代码中的等价物,如下所示。
我有一个模型具有自连接层次结构的情况,我只需要知道模型的任何后代是否具有权限。代码必须递归地查询具有连续节点后代的表。但是一旦我们找到一个有权限的对象,我们就不需要进一步查询了。我这样做如下。(注意我对是否有人拥有对象的权限感兴趣,并且我指定了非通用密钥。如果您正在检查特定用户的权限,您可以user.has_perm('perm_name', obj)
使用您的规则。)
class Foo(models.Model):
parent = models.ForeignKey('Foo', blank=True, null=True)
def descendants(self):
"""
When callers don't need the complete list (eg, checking if any dependent is
viewable by any user), we run fewer queries by only going into the dependent
hierarchy as much as necessary.
"""
immediate_descendants = Foo.objects.filter(parent=self)
for x in immediate_descendants:
yield x
for x in immediate_descendants:
for y in x.descendants():
yield y
def obj_or_descendant_has_perm(self, perm_code):
perm_id = Permission.objects.get(codename=perm_code).id
if FooUserObjectPermission.objects.filter(permission_id=perm_id,
content_object=self).exists()
return True
if FooGroupObjectPermission.objects.filter(permission_id=perm_id,
content_object=self).exists()
return True
for o in self.descendants():
if FooUserObjectPermission.objects.filter(permission_id=perm_id,
content_object=self).exists()
return True
if FooGroupObjectPermission.objects.filter(permission_id=perm_id,
content_object=self).exists()
return True
return False
如果您有这么简单的自联接,请treebeard
查看更有效的层次结构建模方法(物化路径、嵌套集或邻接列表)。就我而言,自联接是通过其他表进行的,所以这是不可能的。
我更进一步,通过从后代返回查询集来允许组选择:
class Foo(models.Model):
parent = models.ForeignKey('Foo', blank=True, null=True)
def descendants(self):
"""
When callers don't need the complete list (eg, checking if any dependent is
viewable by any user), we run fewer queries by only going into the dependent
hierarchy as much as necessary. Returns a generator of querysets of Foo objects.
"""
immediate_descendants = Foo.objects.filter(parent=self)
yield immediate_descendants
for x in immediate_descendants:
for y in x.descendants():
yield y
def obj_or_descendant_has_perm(self, perm_code):
perm_id = Permission.objects.get(codename=perm_code).id
if FooUserObjectPermission.objects.filter(permission_id=perm_id,
content_object=self).exists()
return True
if FooGroupObjectPermission.objects.filter(permission_id=perm_id,
content_object=self).exists()
return True
for gen in self.descendants():
if FooUserObjectPermission.objects.filter(permission_id=perm_id,
content_object__in=gen).exists()
return True
if FooGroupObjectPermission.objects.filter(permission_id=perm_id,
content_object__in=gen).exists()
return True
return False
但是不幸的是,您只能预取和缓存每个对象,而不是查询中的所有对象,因为在检查每个对象时会django-guardian
为层检查生成一个 SQL 子查询Rules
,而您无法预缓存。此外,在 Django 中注册两个权限后端意味着每次都会检查两者,如果名称重复,这可能会导致意外批准。
所以我选择了类似的东西Rules
,但是在执行时会懒惰地编译成一个 Queryset 过滤器。这是一个人为的示例,用于演示将布尔组合到 Guardian 权限、Django Q 对象和延迟评估的 Q 对象的规则中:
add_rule('kimsim_app.model_run.view',
LazyGuardianPermission('kimsim_app.view_model') &
(
LazyGuardianPermission('kimsim_app.saved_model') |
LazyGuardianPermission('kimsim_app.saved_model')
)
& ~LazyQ('modelgroupobjectpermission__group__user', 'request.user')
& ~Q(number_of_failures__lte=42)
)
lazy = LazyPermission('kimsim_app.model_run.view') & ~LazyGuardianPermission('kimsim_app.view_model')
然后,获取用户有权访问的所有“模型”:
Model.objects.filter(lazy.convert_to_q(user=u))
确定用户是否有权访问模型m
(m
如果有则返回,否则返回None
):
Model.objects.filter(lazy.convert_to_q(user=u, obj=m))
然后可以将其构建到 DRF、Django 管理员、模板标签等的权限后端中。代码:
class BaseLazyQ(Q):
"""
Is the type instantiated by the Q library when it parses the operators linking Q, LazyQ, LazyPermission and other
BaseLazyQ subclasses, and generates the 'lazy' Q node tree for a rule.
"""
def __init__(self, *args, **kwargs):
# If no args, this is a connector node joining two sub-clauses
# Or if args[0] is not a string, this is a standard, fully declared Q object
super(BaseLazyQ, self).__init__(*args, **kwargs)
logging.info('Instantiated fully declared BaseLazyQ %s, children %s', self.connector, self.children)
# These two overrides force the connector nodes to be of type `BaseLazyQ`, ie, `LazyQ() & LazyGuardian()`
# creates a BaseLazyQ connector node. These are then simply cloned on calling `convert_to_q()`.
# Sub-classes do different conversion processing in `convert_to_q`.
# They also disable `squash`, as child Q objects have not been instantiated yet: 'lazy' Q or
# guardian permissions are still awaiting values request & obj.
def _combine(self, other, conn):
if not isinstance(other, Q):
raise TypeError(other)
obj = BaseLazyQ()
obj.connector = conn
obj.add(self, conn, squash=False)
obj.add(other, conn, squash=False)
return obj
def __invert__(self):
obj = BaseLazyQ()
obj.add(self, self.AND, squash=False)
obj.negate()
return obj
def convert_to_q(self, *args, **kwargs):
"""
Generates a tree of fully specified Q() objects at run time from our tree of lazy Q, Guardian and LazyPermission
objects, by passing them the ``request`` and ``obj`` objects of the current request.
Note that only kwargs ``request`` or ``obj`` can be used if you will integrate with Django ModelAdmin and
django-restframework permissions classes.
:param request: From the current request
:param obj: Optional - the object permissions are being tested for, if this is object-specific.
:return: A tree of Q() objects that can be applied to a queryset of type ``obj``
"""
logging.info('Converting fully declared BaseLazyQ conn %s children %s', self.connector, self.children)
q = self.__class__._new_instance(children=[], connector=self.connector, negated=self.negated)
for predicate in self.children:
if isinstance(predicate, BaseLazyQ):
# Including subclasses
q.children.append(predicate.convert_to_q(*args, **kwargs))
else:
# Q or Node
q.children.append(predicate.clone())
logging.info('Cloning child Q %s', predicate)
return q
class AlwaysQ(BaseLazyQ):
"""
This class is used for permissions that are always granted or denied regardless of user, request, object, etc.
"""
def __init__(self, always_allow, *args, **kwargs):
"""
Initializes a class which always permits or denies a particular permission. Still subject to boolean operators,
ie, `AlwaysQ('allow') & [some failing test/s]` will refuse permission.
Likewise `AlwaysQ('deny') | [some passing test/s]` will grant permission.
:param always_allow: Must be set to `'allow'` to always allow, or `'deny'` to always deny.
"""
super(AlwaysQ, self).__init__(*args, **kwargs)
if not always_allow in ['allow', 'deny']:
raise LazyPermDeclarationError('AlwaysQ must be declared as either \'allow\' or \'deny\'.')
self.always_allow = always_allow
def convert_to_q(self, *args, **kwargs):
return Q(pk__isnull=not self.always_allow)
class LazyQ(BaseLazyQ):
def __init__(self, *args, **kwargs):
super(LazyQ, self).__init__(*args, **kwargs)
if args and len(args) == 2 and isinstance(args[0], str) and isinstance(args[1], str):
logging.info('Instantiating LazyQ %s %s', args[0], args[1])
self.field = args[0]
attrs = args[1].split('.')
self.parameter = attrs[0]
self.attributes = attrs[1:]
else:
raise LazyPermDeclarationError('LazyQ must be declared with a Q query string and the naming of the '
'parameter attributes to assign it.')
def convert_to_q(self, *args, **kwargs):
"""
Generates a tree of fully specified Q() objects at run time, from our tree of lazy LazyPermission() and LazyQ()
objects, by passing them the ``request`` and ``obj`` objects of the current request.
Note that only kwargs ``request`` or ``obj`` can be used if we are to integrate with Django ModelAdmin and
django-restframework permissions classes.
:param request: From the current request
:param obj: Optional - the object permissions are being tested for, if this is object-specific.
:return: A tree of Q() objects that can be applied to a queryset of type ``obj``
"""
logging.info('Converting LazyQ conn %s children %s args %s kwargs %s', self.connector, self.children, args, kwargs)
value = kwargs[self.parameter]
for attr in self.attributes:
value = getattr(value, attr)
logging.info('attr %s = %s', attr, value)
return Q((self.field, value))
class LazyGuardianPermission(BaseLazyQ):
"""
This class supports lazy guardian permissions, whose request and obj are to be passed at runtime.
"""
def __init__(self, permission, globals_override=False, use_groups=True, related_object=None, *args, **kwargs):
"""
Instantiates a lazy guardian permission that can later be converted to fully defined Q objects when passed
request and (optionally) obj at request time.
:param permission: The fully qualified guardian permission name, including the app label, eg, app.action_model
:param globals_override: If 'allow', if the user has the permission on the model, then they have the
permission on every object. It is not possible to disable global permissions inherited through group ownership.
If 'deny', the user must have *both* the global permission, and the permission on the object. Removing the
global permission for a user effectively removes their permission on all that model's objects.
It is not possible to disable global permissions inherited through group ownership, and so only use those
allocated to a user, ie, use_groups has no effect and is always True for global permission checks, as they are
provided by the Django auth ModelBackend.
Default False, which means global permissions are ignored.
:param use_groups: If False, permissions a user has by group membership will not be considered.
Default True, which means this check will check the permissions of groups the user is in. Note that this does
not affect the `allow_groups` option, or checks that are not object-specific. These will always include group
permissions, as determined by the Django auth ModelBackend.
:param related_object: If the guardian permission is on a model related to the current one, this is the
query string path from the current model to that model.
"""
logging.info('Instantiating LazyGuardianPermission %s', permission)
super(LazyGuardianPermission, self).__init__(*args, **kwargs)
if isinstance(permission, str):
perm_elems = permission.split('.')
if len(perm_elems) == 2:
# This specifies a guardian permission
self.app_label = perm_elems[0]
try:
self.permission = Permission.objects.select_related('content_type')\
.get(content_type__app_label=perm_elems[0],
codename=perm_elems[1])
except Permission.DoesNotExist:
raise LazyPermDeclarationError('Guardian permission %s not found. LazyGuardianPermission must be '
'passed a fully qualified guardian permission, eg, '
'app.action_model. Q, LazyQ or LazyPermission objects can also be '
'used.' % permission)
self.related_object = related_object
self.use_groups = use_groups
self.globals_override = globals_override
self.model_cls = self.permission.content_type.model_class()
else:
raise LazyPermDeclarationError('Guardian permission %s not found. LazyGuardianPermission must contain '
'a fully qualified guardian permission, eg, app_action_model. Q, LazyQ '
'or LazyPermission objects can also be used.' % permission)
else:
raise LazyPermDeclarationError('LazyGuardianPermission must be declared with a fully qualified guardian '
'permission name, eg, app.action_model. <%s> not a valid parameter.' %
str(permission))
def convert_to_q(self, user, obj=None):
"""
Generates a tree of fully specified Q() objects at run time to test this Guardian permission, by passing them
the `request` and `obj` objects of the current request.
:param user: From the current request
:param obj: Optional - the object permissions are being tested for, if this is object-specific.
:return: A tree of Q() objects that can be applied to a queryset of type ``obj``
"""
logging.info('Converting LazyGuardianPermission %s%s', '~' if self.negated else '', self.permission.codename)
if self.globals_override:
has_global = user.has_perm('%s.%s' % (self.app_label, self.permission.codename))
if has_global and self.globals_override == 'allow':
return Q(pk__isnull=False)
elif not has_global and self.globals_override == 'deny':
return Q(pk__isnull=True)
related_object_prefix = '%s__' % self.related_object if self.related_object else ''
user_obj_perms_model = get_user_obj_perms_model(self.model_cls)
group_obj_perms_model = get_group_obj_perms_model(self.model_cls)
# logging.info('%s %s %s', self.model_cls, user_obj_perms_model, user_obj_perms_model.objects)
if user_obj_perms_model.objects.is_generic():
raise LazyPermDeclarationError('%s appears to be using generic foreign keys. LazyPermissions '
'does not support Guardian permissions maintained via generic '
'foreign keys, and insists you specify a custom table joining '
'object, permission and user, for example `class '
'DatasetUserObjectPermission(UserObjectPermissionBase): '
'content_object = models.ForeignKey(Dataset)` and likewise '
'for Groups. This is also more performant and maintains '
'referential integrity.' % self.permission)
user_obj_perms_model_ref = '%s%s' % (related_object_prefix,
user_obj_perms_model.content_object.field.related_query_name())
if obj:
filters = (
Q(('%s__user' % user_obj_perms_model_ref, user)) &
Q(('%s__permission' % user_obj_perms_model_ref, self.permission)) &
Q(('%s__content_object' % user_obj_perms_model_ref, obj.pk))
)
else:
filters = (
Q(('%s__user' % user_obj_perms_model_ref, user)) &
Q(('%s__permission' % user_obj_perms_model_ref, self.permission))
)
if self.use_groups:
if user_obj_perms_model.objects.is_generic():
raise LazyPermDeclarationError('%s appears to be using generic foreign keys. LazyPermissions '
'does not support Guardian permissions maintained via generic '
'foreign keys, and insists you specify a custom table joining '
'object, permission and user, for example `class '
'DatasetGroupObjectPermission(GroupObjectPermissionBase): '
'content_object = models.ForeignKey(Dataset)` and likewise '
'for Users. This is also more performant and maintains '
'referential integrity.' % self.permission)
group_obj_perms_model_ref = '%s%s' % (related_object_prefix,
group_obj_perms_model.content_object.field.related_query_name())
if obj:
filters |= (
Q(('%s__group__user' % group_obj_perms_model_ref, user)) &
Q(('%s__permission' % group_obj_perms_model_ref, self.permission)) &
Q(('%s__content_object' % group_obj_perms_model_ref, obj.pk))
)
else:
filters |= (
Q(('%s__group__user' % group_obj_perms_model_ref, user)) &
Q(('%s__permission' % group_obj_perms_model_ref, self.permission))
)
logging.info('Converted non-declared LazyGuardianPermission %s%s filters %s',
'~' if self.negated else '', self.permission.codename, filters)
return Q(filters)
class LazyPermission(BaseLazyQ):
"""
This class supports recursive LazyPermission references, converted to lazy q or guardian checks on
declaration then treated identically on calling.
"""
def __init__(self, permission=None, *args, **kwargs):
logging.info('instantiating gorm permission=%s', permission)
super(LazyPermission, self).__init__(*args, **kwargs)
if isinstance(permission, str):
try:
# This is a recursive LazyPermission reference, so add it as a sub-tree
self.children.append(default_rules[permission])
except KeyError:
raise LazyPermDeclarationError('%s not found in rule_set. LazyPermission must contain a fully '
'qualified guardian permission, eg, app.action_model, or another '
'LazyPermission\'s key.' % permission)
logging.info('Instantiated LazyPermission %s as LazyGuardianPermission sub-tree %s.',
permission, self.children)
else:
raise LazyPermDeclarationError('LazyPermission must be declared with either a fully qualified guardian '
'permission, eg, app.action_model, or another LazyPermission\' key.')
class RuleSet(dict):
def test_rule(self, name, *args, **kwargs):
return name in self and self[name].convert_to_q(*args, **kwargs)
def rule_exists(self, name):
return name in self
def add_rule(self, name, pred):
if name in self:
raise KeyError('A rule with name `%s` already exists' % name)
self[name] = pred
def remove_rule(self, name):
del self[name]