TandoorRecipes/cookbook/helper/permission_helper.py

437 lines
16 KiB
Python

import inspect
from django.conf import settings
from django.contrib import messages
from django.contrib.auth.decorators import user_passes_test
from django.core.cache import cache
from django.core.exceptions import ValidationError, ObjectDoesNotExist
from django.http import HttpResponseRedirect
from django.urls import reverse, reverse_lazy
from django.utils.translation import gettext as _
from oauth2_provider.contrib.rest_framework import TokenHasScope, TokenHasReadWriteScope
from oauth2_provider.models import AccessToken
from rest_framework import permissions
from rest_framework.permissions import SAFE_METHODS
from cookbook.models import ShareLink, Recipe, UserSpace
def get_allowed_groups(groups_required):
"""
Builds a list of all groups equal or higher to the provided groups
This means checking for guest will also allow admins to access
:param groups_required: list or tuple of groups
:return: tuple of groups
"""
groups_allowed = tuple(groups_required)
if 'guest' in groups_required:
groups_allowed = groups_allowed + ('user', 'admin')
if 'user' in groups_required:
groups_allowed = groups_allowed + ('admin',)
return groups_allowed
def has_group_permission(user, groups, no_cache=False):
"""
Tests if a given user is member of a certain group (or any higher group)
Superusers always bypass permission checks.
Unauthenticated users can't be member of any group thus always return false.
:param no_cache: (optional) do not return cached results, always check agains DB
:param user: django auth user object
:param groups: list or tuple of groups the user should be checked for
:return: True if user is in allowed groups, false otherwise
"""
if not user.is_authenticated:
return False
groups_allowed = get_allowed_groups(groups)
CACHE_KEY = hash((inspect.stack()[0][3], (user.pk, user.username, user.email), groups_allowed))
if not no_cache:
cached_result = cache.get(CACHE_KEY, default=None)
if cached_result is not None:
return cached_result
result = False
if user.is_authenticated:
if user_space := user.userspace_set.filter(active=True):
if len(user_space) != 1:
result = False # do not allow any group permission if more than one space is active, needs to be changed when simultaneous multi-space-tenancy is added
elif bool(user_space.first().groups.filter(name__in=groups_allowed)):
result = True
cache.set(CACHE_KEY, result, timeout=10)
return result
def is_object_owner(user, obj):
"""
Tests if a given user is the owner of a given object
test performed by checking user against the objects user
and create_by field (if exists)
:param user django auth user object
:param obj any object that should be tested
:return: true if user is owner of object, false otherwise
"""
if not user.is_authenticated:
return False
try:
return obj.get_owner() == user
except Exception:
return False
def is_space_owner(user, obj):
"""
Tests if a given user is the owner the space of a given object
:param user django auth user object
:param obj any object that should be tested
:return: true if user is owner of the objects space, false otherwise
"""
if not user.is_authenticated:
return False
try:
return obj.get_space().get_owner() == user
except Exception:
return False
def is_object_shared(user, obj):
"""
Tests if a given user is shared for a given object
test performed by checking user against the objects shared table
:param user django auth user object
:param obj any object that should be tested
:return: true if user is shared for object, false otherwise
"""
# TODO this could be improved/cleaned up by adding
# share checks for relevant objects
if not user.is_authenticated:
return False
return user in obj.get_shared()
def share_link_valid(recipe, share):
"""
Verifies the validity of a share uuid
:param recipe: recipe object
:param share: share uuid
:return: true if a share link with the given recipe and uuid exists
"""
try:
CACHE_KEY = f'recipe_share_{recipe.pk}_{share}'
if c := cache.get(CACHE_KEY, False):
return c
if link := ShareLink.objects.filter(recipe=recipe, uuid=share, abuse_blocked=False).first():
if 0 < settings.SHARING_LIMIT < link.request_count:
return False
link.request_count += 1
link.save()
cache.set(CACHE_KEY, True, timeout=3)
return True
return False
except ValidationError:
return False
# Django Views
def group_required(*groups_required):
"""
Decorator that tests the requesting user to be member
of at least one of the provided groups or higher level groups
:param groups_required: list of required groups
:return: true if member of group, false otherwise
"""
def in_groups(u):
return has_group_permission(u, groups_required)
return user_passes_test(in_groups, login_url='view_no_perm')
class GroupRequiredMixin(object):
"""
groups_required - list of strings, required param
"""
groups_required = None
def dispatch(self, request, *args, **kwargs):
if not has_group_permission(request.user, self.groups_required):
if not request.user.is_authenticated:
messages.add_message(request, messages.ERROR,
_('You are not logged in and therefore cannot view this page!'))
return HttpResponseRedirect(reverse_lazy('account_login') + '?next=' + request.path)
else:
messages.add_message(request, messages.ERROR,
_('You do not have the required permissions to view this page!'))
return HttpResponseRedirect(reverse_lazy('index'))
try:
obj = self.get_object()
if obj.get_space() != request.space:
messages.add_message(request, messages.ERROR,
_('You do not have the required permissions to view this page!'))
return HttpResponseRedirect(reverse_lazy('index'))
except AttributeError:
pass
return super(GroupRequiredMixin, self).dispatch(request, *args, **kwargs)
class OwnerRequiredMixin(object):
def dispatch(self, request, *args, **kwargs):
if not request.user.is_authenticated:
messages.add_message(request, messages.ERROR,
_('You are not logged in and therefore cannot view this page!'))
return HttpResponseRedirect(reverse_lazy('account_login') + '?next=' + request.path)
else:
if not is_object_owner(request.user, self.get_object()):
messages.add_message(request, messages.ERROR,
_('You cannot interact with this object as it is not owned by you!'))
return HttpResponseRedirect(reverse('index'))
try:
obj = self.get_object()
if not request.user.userspace.filter(space=obj.get_space()).exists():
messages.add_message(request, messages.ERROR,
_('You do not have the required permissions to view this page!'))
return HttpResponseRedirect(reverse_lazy('index'))
except AttributeError:
pass
return super(OwnerRequiredMixin, self).dispatch(request, *args, **kwargs)
# Django Rest Framework Permission classes
class CustomIsOwner(permissions.BasePermission):
"""
Custom permission class for django rest framework views
verifies user has ownership over object
(either user or created_by or user is request user)
"""
message = _('You cannot interact with this object as it is not owned by you!')
def has_permission(self, request, view):
return request.user.is_authenticated
def has_object_permission(self, request, view, obj):
return is_object_owner(request.user, obj)
class CustomIsOwnerReadOnly(CustomIsOwner):
def has_permission(self, request, view):
return super().has_permission(request, view) and request.method in SAFE_METHODS
def has_object_permission(self, request, view, obj):
return super().has_object_permission(request, view) and request.method in SAFE_METHODS
class CustomIsSpaceOwner(permissions.BasePermission):
"""
Custom permission class for django rest framework views
verifies if the user is the owner of the space the object belongs to
"""
message = _('You cannot interact with this object as it is not owned by you!')
def has_permission(self, request, view):
return request.user.is_authenticated and request.space.created_by == request.user
def has_object_permission(self, request, view, obj):
return is_space_owner(request.user, obj)
# TODO function duplicate/too similar name
class CustomIsShared(permissions.BasePermission):
"""
Custom permission class for django rest framework views
verifies user is shared for the object he is trying to access
"""
message = _('You cannot interact with this object as it is not owned by you!') # noqa: E501
def has_permission(self, request, view):
return request.user.is_authenticated
def has_object_permission(self, request, view, obj):
# # temporary hack to make old shopping list work with new shopping list
# if obj.__class__.__name__ in ['ShoppingList', 'ShoppingListEntry']:
# return is_object_shared(request.user, obj) or obj.created_by in list(request.user.get_shopping_share())
return is_object_shared(request.user, obj)
class CustomIsGuest(permissions.BasePermission):
"""
Custom permission class for django rest framework views
verifies the user is member of at least the group: guest
"""
message = _('You do not have the required permissions to view this page!')
def has_permission(self, request, view):
return has_group_permission(request.user, ['guest'])
def has_object_permission(self, request, view, obj):
return has_group_permission(request.user, ['guest'])
class CustomIsUser(permissions.BasePermission):
"""
Custom permission class for django rest framework views
verifies the user is member of at least the group: user
"""
message = _('You do not have the required permissions to view this page!')
def has_permission(self, request, view):
return has_group_permission(request.user, ['user'])
class CustomIsAdmin(permissions.BasePermission):
"""
Custom permission class for django rest framework views
verifies the user is member of at least the group: admin
"""
message = _('You do not have the required permissions to view this page!')
def has_permission(self, request, view):
return has_group_permission(request.user, ['admin'])
class CustomIsShare(permissions.BasePermission):
"""
Custom permission class for django rest framework views
verifies the requesting user provided a valid share link
"""
message = _('You do not have the required permissions to view this page!')
def has_permission(self, request, view):
return request.method in SAFE_METHODS and 'pk' in view.kwargs
def has_object_permission(self, request, view, obj):
share = request.query_params.get('share', None)
if share:
return share_link_valid(obj, share)
return False
class CustomRecipePermission(permissions.BasePermission):
"""
Custom permission class for recipe api endpoint
"""
message = _('You do not have the required permissions to view this page!')
def has_permission(self, request, view): # user is either at least a guest or a share link is given and the request is safe
share = request.query_params.get('share', None)
return has_group_permission(request.user, ['guest']) or (share and request.method in SAFE_METHODS and 'pk' in view.kwargs)
def has_object_permission(self, request, view, obj):
share = request.query_params.get('share', None)
if share:
return share_link_valid(obj, share)
else:
if obj.private:
return ((obj.created_by == request.user) or (request.user in obj.shared.all())) and obj.space == request.space
else:
return has_group_permission(request.user, ['guest']) and obj.space == request.space
class CustomUserPermission(permissions.BasePermission):
"""
Custom permission class for user api endpoint
"""
message = _('You do not have the required permissions to view this page!')
def has_permission(self, request, view): # a space filtered user list is visible for everyone
return has_group_permission(request.user, ['guest'])
def has_object_permission(self, request, view, obj): # object write permissions are only available for user
if request.method in SAFE_METHODS and 'pk' in view.kwargs and has_group_permission(request.user, ['guest']) and request.space in obj.userspace_set.all():
return True
elif request.user == obj:
return True
else:
return False
class CustomTokenHasScope(TokenHasScope):
"""
Custom implementation of Django OAuth Toolkit TokenHasScope class
Only difference: if any other authentication method except OAuth2Authentication is used the scope check is ignored
IMPORTANT: do not use this class without any other permission class as it will not check anything besides token scopes
"""
def has_permission(self, request, view):
if type(request.auth) == AccessToken:
return super().has_permission(request, view)
else:
return request.user.is_authenticated
class CustomTokenHasReadWriteScope(TokenHasReadWriteScope):
"""
Custom implementation of Django OAuth Toolkit TokenHasReadWriteScope class
Only difference: if any other authentication method except OAuth2Authentication is used the scope check is ignored
IMPORTANT: do not use this class without any other permission class as it will not check anything besides token scopes
"""
def has_permission(self, request, view):
if type(request.auth) == AccessToken:
return super().has_permission(request, view)
else:
return True
def above_space_limit(space): # TODO add file storage limit
"""
Test if the space has reached any limit (e.g. max recipes, users, ..)
:param space: Space to test for limits
:return: Tuple (True if above or equal any limit else false, message)
"""
r_limit, r_msg = above_space_recipe_limit(space)
u_limit, u_msg = above_space_user_limit(space)
return r_limit or u_limit, (r_msg + ' ' + u_msg).strip()
def above_space_recipe_limit(space):
"""
Test if a space has reached its recipe limit
:param space: Space to test for limits
:return: Tuple (True if above or equal limit else false, message)
"""
limit = space.max_recipes != 0 and Recipe.objects.filter(space=space).count() >= space.max_recipes
if limit:
return True, _('You have reached the maximum number of recipes for your space.')
return False, ''
def above_space_user_limit(space):
"""
Test if a space has reached its user limit
:param space: Space to test for limits
:return: Tuple (True if above or equal limit else false, message)
"""
limit = space.max_users != 0 and UserSpace.objects.filter(space=space).count() > space.max_users
if limit:
return True, _('You have more users than allowed in your space.')
return False, ''
def switch_user_active_space(user, space):
"""
Switch the currently active space of a user by setting all spaces to inactive and activating the one passed
:param user: user to change active space for
:param space: space to activate user for
:return user space object or none if not found/no permission
"""
try:
us = UserSpace.objects.get(space=space, user=user)
if not us.active:
UserSpace.objects.filter(user=user).update(active=False)
us.active = True
us.save()
return us
else:
return us
except ObjectDoesNotExist:
return None