Added extra checks with permissions and better validation

This commit is contained in:
Viswamedha Nalabotu 2026-03-08 12:56:04 +00:00
parent 00d60b3f4f
commit 8bd9a3d803

View file

@ -1,13 +1,17 @@
from django.contrib.auth import authenticate, login, logout
from django.core.exceptions import ValidationError as DjangoValidationError
from django.db import IntegrityError, transaction
from django.db.models import Q
from rest_framework.status import HTTP_204_NO_CONTENT, HTTP_400_BAD_REQUEST, HTTP_401_UNAUTHORIZED, HTTP_403_FORBIDDEN, HTTP_404_NOT_FOUND, HTTP_200_OK, HTTP_201_CREATED
from rest_framework.decorators import action
from rest_framework.exceptions import NotFound, PermissionDenied, ValidationError
from rest_framework.permissions import AllowAny, IsAuthenticated, IsAuthenticatedOrReadOnly
from rest_framework.response import Response
from rest_framework.status import HTTP_204_NO_CONTENT, HTTP_400_BAD_REQUEST, HTTP_401_UNAUTHORIZED, HTTP_403_FORBIDDEN, HTTP_404_NOT_FOUND, HTTP_200_OK, HTTP_201_CREATED
from rest_framework.viewsets import ModelViewSet, ReadOnlyModelViewSet
from apps.accounts.models import Invite, Organization, Role, User
from apps.accounts.permissions import CanManageOrganization, IsOrganizationOwnerOrMember, can_manage_organization
from apps.accounts.serializers import InviteSerializer, OrganizationSerializer, RoleSerializer, UserSerializer
class UserViewSet(ReadOnlyModelViewSet):
@ -48,10 +52,7 @@ class UserViewSet(ReadOnlyModelViewSet):
@action(detail=False, methods=['post'], permission_classes=[AllowAny])
def signup(self, request):
try:
data = request.data
except:
return Response({'detail': 'Invalid data provided.', 'success': False}, status=HTTP_400_BAD_REQUEST)
data = request.data
email_address = data.get('email_address')
if not email_address:
return Response({'detail': 'Email address is required.', 'success': False}, status=HTTP_400_BAD_REQUEST)
@ -79,7 +80,7 @@ class UserViewSet(ReadOnlyModelViewSet):
is_manager=manager,
)
return Response({'detail': 'User account created successfully.', 'success': True}, status=HTTP_201_CREATED)
except Exception as e:
except (ValueError, TypeError, DjangoValidationError, IntegrityError) as e:
return Response({'detail': str(e), 'success': False}, status=HTTP_400_BAD_REQUEST)
@action(detail=False, methods=['post'], permission_classes=[IsAuthenticated])
@ -98,14 +99,18 @@ class UserViewSet(ReadOnlyModelViewSet):
user.save()
return Response({'detail': 'Password changed successfully', 'success': True}, status=HTTP_200_OK)
class OrganizationViewSet(ModelViewSet):
queryset = Organization.objects.all()
serializer_class = OrganizationSerializer
permission_classes = [IsAuthenticated]
lookup_field = 'uuid'
def get_permissions(self):
permissions = super().get_permissions()
if self.action in ['retrieve', 'update', 'partial_update', 'destroy', 'leave', 'list_members', 'remove_member']:
return [*permissions, IsOrganizationOwnerOrMember()]
return permissions
def get_queryset(self):
return Organization.objects.filter(
Q(owner=self.request.user) | Q(members=self.request.user)
@ -116,79 +121,40 @@ class OrganizationViewSet(ModelViewSet):
organization.members.add(self.request.user)
def update(self, request, *args, **kwargs):
if not request.user.is_manager:
return Response({'error': 'Forbidden'}, status=HTTP_403_FORBIDDEN)
return super().update(request, *args, **kwargs)
@action(detail=True, methods=['get'], url_path='invite')
def list_invites(self, request, uuid=None):
organization = self.get_object()
invites = organization.invites.all()
serializer = InviteSerializer(invites, many=True, context={'request': request})
return Response(serializer.data)
@action(detail=True, methods=['post'], url_path='create-invite')
def create_invite(self, request, uuid=None):
organization = self.get_object()
if not request.user.is_manager:
return Response({'error': 'Forbidden'}, status=HTTP_403_FORBIDDEN)
max_uses = request.query_params.get('max_uses') or request.data.get('max_uses', 1)
invitation = Invite.objects.create(
organization=organization,
created_by=request.user,
max_uses=int(max_uses) if str(max_uses).isdigit() else 1
)
return Response(InviteSerializer(invitation, context={'request': request}).data)
@action(detail=True, methods=['delete'], url_path=r'revoke-invite/(?P<invite_uuid>[0-9a-f-]{36})')
def revoke_invite(self, request, uuid=None, invite_uuid=None):
organization = self.get_object()
if not request.user.is_manager:
return Response({'error': 'Only managers can revoke invites'}, status=HTTP_403_FORBIDDEN)
invite = organization.invites.filter(uuid=invite_uuid).first()
if not invite:
return Response({'error': 'Invalid invitation uuid or not found in this organization'}, status=HTTP_404_NOT_FOUND)
invite.is_active = False
invite.save()
return Response({'message': 'Invitation successfully revoked'}, status=HTTP_200_OK)
@action(detail=False, methods=['post'], url_path='join/(?P<invite_uuid>[0-9a-f-]{36})')
def join(self, request, invite_uuid=None):
try:
invitation = Invite.objects.get(uuid=invite_uuid)
except Invite.DoesNotExist:
return Response({'error': 'Not Found'}, status=HTTP_404_NOT_FOUND)
if not invitation.is_valid():
return Response({'error': 'Invalid or expired invitation'}, status=HTTP_400_BAD_REQUEST)
organization = invitation.organization
if organization.members.filter(uuid=request.user.uuid).exists():
return Response({'error': 'Already a member'}, status=HTTP_403_FORBIDDEN)
organization.members.add(request.user)
invitation.uses += 1
if invitation.uses >= invitation.max_uses:
invitation.is_active = False
invitation.save()
return Response({
'message': 'Joined',
'organization': OrganizationSerializer(organization).data
})
def destroy(self, request, *args, **kwargs):
return super().destroy(request, *args, **kwargs)
@action(detail=True, methods=['post'], url_path='leave')
def leave(self, request, uuid=None):
organization = self.get_object()
organization: Organization = self.get_object()
if organization.owner == request.user:
return Response({'error': 'Owner cannot leave'}, status=HTTP_403_FORBIDDEN)
other_members = organization.members.exclude(uuid=request.user.uuid)
if other_members.exists():
return Response(
{
'error': (
'Owner cannot leave while other members/managers exist. '
'Remove all other members first.'
)
},
status=HTTP_400_BAD_REQUEST,
)
organization.delete()
return Response({'message': 'Organization deleted. Owner has left successfully.'}, status=HTTP_200_OK)
if not organization.members.filter(uuid=request.user.uuid).exists():
return Response({'error': 'Not a member'}, status=HTTP_400_BAD_REQUEST)
roles = Role.objects.filter(organization=organization, members=request.user)
for role in roles:
role.members.remove(request.user)
organization.members.remove(request.user)
return Response({'message': 'Left organization'})
return Response({'message': 'Left organization'}, status=HTTP_200_OK)
@action(detail=True, methods=['get'], url_path='members')
def list_members(self, request, uuid=None):
@ -198,72 +164,195 @@ class OrganizationViewSet(ModelViewSet):
@action(detail=True, methods=['post'], url_path=r'member/(?P<user_uuid>[0-9a-f-]{36})/remove')
def remove_member(self, request, uuid=None, user_uuid=None):
if not request.user.is_manager:
return Response({'error': 'Forbidden'}, status=HTTP_403_FORBIDDEN)
organization = self.get_object()
organization: Organization = self.get_object()
if str(organization.owner.uuid) == str(user_uuid):
return Response({'error': 'Cannot remove owner'}, status=HTTP_403_FORBIDDEN)
user_to_remove = organization.members.filter(uuid=user_uuid).first()
if not user_to_remove:
return Response({'error': 'Not found'}, status=HTTP_404_NOT_FOUND)
organization.members.remove(user_to_remove)
return Response({'message': 'Removed'})
return Response({'message': 'Removed'}, status=HTTP_200_OK)
@action(detail=True, methods=['get', 'post'], url_path='role')
def roles(self, request, uuid=None):
organization = self.get_object()
if request.method == 'GET':
return Response(RoleSerializer(organization.roles.all(), many=True).data)
class InviteViewSet(ModelViewSet):
queryset = Invite.objects.all()
serializer_class = InviteSerializer
permission_classes = [IsAuthenticated]
lookup_field = 'uuid'
if not request.user.is_manager:
return Response({'error': 'Forbidden'}, status=HTTP_403_FORBIDDEN)
def _get_organization_uuid(self, request):
organization_uuid = request.query_params.get('organization_uuid')
if organization_uuid in (None, ''):
organization_uuid = request.data.get('organization_uuid')
return organization_uuid
def get_permissions(self):
permissions = super().get_permissions()
if self.action in ['destroy']:
return [*permissions, CanManageOrganization()]
return permissions
def get_queryset(self):
user = self.request.user
queryset = Invite.objects.filter(
Q(organization__owner=user) | Q(organization__members=user)
).distinct().order_by('-created_at')
organization_uuid = self._get_organization_uuid(self.request)
if organization_uuid:
queryset = queryset.filter(organization__uuid=organization_uuid)
return queryset
def create(self, request, *args, **kwargs):
organization_uuid = self._get_organization_uuid(request)
if not organization_uuid:
raise ValidationError({'organization_uuid': 'organization_uuid is required'})
organization = Organization.objects.filter(uuid=organization_uuid).filter(
Q(owner=request.user) | Q(members=request.user)
).first()
if not organization:
raise NotFound('Organization not found')
if not can_manage_organization(request.user, organization):
raise PermissionDenied('Only organization owner or managers can create invites')
max_uses = request.query_params.get('max_uses') or request.data.get('max_uses', 1)
try:
max_uses = int(max_uses)
except (TypeError, ValueError):
raise ValidationError({'max_uses': 'max_uses must be an integer'})
if max_uses < 1 or max_uses > 1000:
raise ValidationError({'max_uses': 'max_uses must be between 1 and 1000'})
invitation = Invite.objects.create(
organization=organization,
created_by=request.user,
max_uses=max_uses,
)
serializer = self.get_serializer(invitation)
return Response(serializer.data, status=HTTP_201_CREATED)
def destroy(self, request, *args, **kwargs):
invite = self.get_object()
invite.is_active = False
invite.save(update_fields=['is_active', 'updated_at'])
return Response({'message': 'Invitation successfully revoked'}, status=HTTP_200_OK)
@action(detail=False, methods=['post'], url_path='join')
def join(self, request):
invite_uuid = request.query_params.get('invite_uuid') or request.data.get('invite_uuid')
if not invite_uuid:
return Response({'error': 'invite_uuid is required'}, status=HTTP_400_BAD_REQUEST)
with transaction.atomic():
try:
invitation = Invite.objects.select_for_update().select_related('organization').get(uuid=invite_uuid)
except Invite.DoesNotExist:
return Response({'error': 'Not Found'}, status=HTTP_404_NOT_FOUND)
if not invitation.is_valid():
return Response({'error': 'Invalid or expired invitation'}, status=HTTP_400_BAD_REQUEST)
organization = invitation.organization
if organization.members.filter(uuid=request.user.uuid).exists():
return Response({'error': 'Already a member'}, status=HTTP_403_FORBIDDEN)
organization.members.add(request.user)
invitation.uses += 1
if invitation.uses >= invitation.max_uses:
invitation.is_active = False
invitation.save(update_fields=['uses', 'is_active', 'updated_at'])
return Response({
'message': 'Joined',
'organization': OrganizationSerializer(organization).data,
}, status=HTTP_200_OK)
class RoleViewSet(ModelViewSet):
queryset = Role.objects.all()
serializer_class = RoleSerializer
permission_classes = [IsAuthenticated]
lookup_field = 'uuid'
def _get_organization_uuid(self, request):
organization_uuid = request.query_params.get('organization_uuid')
if organization_uuid in (None, ''):
organization_uuid = request.data.get('organization_uuid')
return organization_uuid
def get_permissions(self):
permissions = super().get_permissions()
if self.action in ['destroy']:
return [*permissions, CanManageOrganization()]
return permissions
def get_queryset(self):
user = self.request.user
queryset = Role.objects.filter(
Q(organization__owner=user) | Q(organization__members=user)
).distinct().order_by('name')
organization_uuid = self._get_organization_uuid(self.request)
if organization_uuid:
queryset = queryset.filter(organization__uuid=organization_uuid)
return queryset
def create(self, request, *args, **kwargs):
organization_uuid = self._get_organization_uuid(request)
if not organization_uuid:
raise ValidationError({'organization_uuid': 'organization_uuid is required'})
organization = Organization.objects.filter(uuid=organization_uuid).filter(
Q(owner=request.user) | Q(members=request.user)
).first()
if not organization:
raise NotFound('Organization not found')
if not can_manage_organization(request.user, organization):
raise PermissionDenied('Only organization owner or managers can create roles')
name = (request.data.get('name') or '').strip()
description = (request.data.get('description') or '').strip()
if not name:
return Response({'error': 'Role name is required'}, status=HTTP_400_BAD_REQUEST)
raise ValidationError({'name': 'Role name is required'})
if organization.roles.filter(name__iexact=name).exists():
return Response({'error': 'A role with this name already exists in this organization'}, status=HTTP_400_BAD_REQUEST)
raise ValidationError({'name': 'A role with this name already exists in this organization'})
role = Role.objects.create(name=name, description=description, organization=organization)
return Response(RoleSerializer(role).data, status=HTTP_201_CREATED)
serializer = self.get_serializer(role)
return Response(serializer.data, status=HTTP_201_CREATED)
@action(detail=False, methods=['get'], url_path='role/mine')
def my_roles(self, request):
roles = Role.objects.filter(members=request.user).distinct()
serializer = RoleSerializer(roles, many=True)
return Response(serializer.data)
@action(detail=True, methods=['delete'], url_path='role/(?P<role_uuid>[0-9a-f-]{36})')
def delete_role(self, request, uuid=None, role_uuid=None):
if not request.user.is_manager:
return Response({'error': 'Forbidden'}, status=HTTP_403_FORBIDDEN)
role = Role.objects.filter(uuid=role_uuid, organization__uuid=uuid)
if not role.exists():
return Response({'error': 'Not found'}, status=HTTP_404_NOT_FOUND)
def destroy(self, request, *args, **kwargs):
role = self.get_object()
role.delete()
return Response(status=HTTP_204_NO_CONTENT)
@action(detail=True, methods=['post'], url_path='role/(?P<role_uuid>[0-9a-f-]{36})/join')
def join_role(self, request, uuid=None, role_uuid=None):
organization = self.get_object()
@action(detail=False, methods=['get'], url_path='mine')
def mine(self, request):
roles = Role.objects.filter(members=request.user).distinct()
serializer = self.get_serializer(roles, many=True)
return Response(serializer.data)
role = Role.objects.filter(uuid=role_uuid, organization=organization).first()
if not role:
return Response({'error': 'Role not found'}, status=HTTP_404_NOT_FOUND)
@action(detail=True, methods=['post'], url_path='join')
def join(self, request, uuid=None):
role = self.get_object()
organization = role.organization
if not organization.members.filter(id=request.user.id).exists() and organization.owner != request.user:
return Response({'error': 'Not a member of this organization'}, status=HTTP_403_FORBIDDEN)
is_owner = organization.owner == request.user
is_member = organization.members.filter(id=request.user.id).exists()
if not (is_owner or is_member):
raise PermissionDenied('Not a member of this organization')
if role.members.filter(id=request.user.id).exists():
return Response({'message': 'Already a member of this role'}, status=HTTP_200_OK)
role.members.add(request.user)
return Response({'message': 'Joined role successfully'}, status=HTTP_200_OK)
return Response({'message': 'Joined role successfully'}, status=HTTP_200_OK)