Dynavera/apps/accounts/viewsets.py

359 lines
16 KiB
Python
Raw Normal View History

from django.contrib.auth import authenticate, login, logout
from django.core.exceptions import ValidationError as DjangoValidationError
from django.db import IntegrityError, transaction
from django.db.models import Q
from rest_framework.decorators import action
from rest_framework.exceptions import NotFound, PermissionDenied, ValidationError
from rest_framework.permissions import AllowAny, IsAuthenticated, IsAuthenticatedOrReadOnly
from rest_framework.response import Response
from rest_framework.status import HTTP_204_NO_CONTENT, HTTP_400_BAD_REQUEST, HTTP_401_UNAUTHORIZED, HTTP_403_FORBIDDEN, HTTP_404_NOT_FOUND, HTTP_200_OK, HTTP_201_CREATED
from rest_framework.viewsets import ModelViewSet, ReadOnlyModelViewSet
from apps.accounts.models import Invite, Organization, Role, User
from apps.accounts.permissions import CanManageOrganization, IsOrganizationOwnerOrMember, can_manage_organization
from apps.accounts.serializers import InviteSerializer, OrganizationSerializer, RoleSerializer, UserSerializer
class UserViewSet(ReadOnlyModelViewSet):
queryset = User.objects.all()
serializer_class = UserSerializer
permission_classes = [IsAuthenticatedOrReadOnly]
lookup_field = 'uuid'
@action(detail=False, methods=['post'], permission_classes=[AllowAny])
def login(self, request):
email_address = request.data.get('email_address')
password = request.data.get('password')
if not email_address or not password:
return Response({'error': 'Email and password are required'}, status=HTTP_400_BAD_REQUEST)
email_address = User.objects.normalize_email(email_address)
user = authenticate(request, username=email_address, password=password)
if user is None:
return Response({'error': 'Invalid credentials'}, status=HTTP_401_UNAUTHORIZED)
login(request, user)
return Response({'user': UserSerializer(user).data, 'message': 'Login successful', 'success': True}, status=HTTP_200_OK)
@action(detail=False, methods=['post'], permission_classes=[IsAuthenticated])
def logout(self, request):
logout(request)
return Response({'message': 'Logout successful', 'success': True}, status=HTTP_200_OK)
@action(detail=False, methods=['get'], permission_classes=[IsAuthenticated])
def me(self, request):
user_data = UserSerializer(request.user).data
user_data['success'] = True
return Response(user_data)
@action(detail=False, methods=['get'], permission_classes=[AllowAny])
def session(self, request):
return Response({'isAuthenticated': request.user.is_authenticated, 'isStaff': request.user.is_staff if request.user.is_authenticated else False})
@action(detail=False, methods=['post'], permission_classes=[AllowAny])
def signup(self, request):
data = request.data
email_address = data.get('email_address')
if not email_address:
return Response({'detail': 'Email address is required.', 'success': False}, status=HTTP_400_BAD_REQUEST)
email_address = User.objects.normalize_email(email_address)
if User.objects.filter(email_address=email_address).exists():
return Response({'detail': 'Email address already exists.', 'success': False}, status=HTTP_400_BAD_REQUEST)
if not data.get('first_name') or not data.get('last_name'):
return Response({'detail': 'First and last name(s) must be provided.', 'success': False}, status=HTTP_400_BAD_REQUEST)
if type(manager:=data.get('manager')) is not bool:
if manager in ['true', 'True']:
manager = True
elif manager in ['false', 'False']:
manager = False
else:
return Response({'detail': '"manager" field must be a boolean value.', 'success': False}, status=HTTP_400_BAD_REQUEST)
if data.get('password') != data.get('confirm_password'):
return Response({'detail': 'Passwords do not match.', 'success': False}, status=HTTP_400_BAD_REQUEST)
try:
user = User.objects.create_user(
email_address=email_address,
password=data.get('password'),
first_name=data.get('first_name'),
last_name=data.get('last_name'),
date_of_birth=data.get('date_of_birth'),
is_manager=manager,
)
return Response({'detail': 'User account created successfully.', 'success': True}, status=HTTP_201_CREATED)
except (ValueError, TypeError, DjangoValidationError, IntegrityError) as e:
return Response({'detail': str(e), 'success': False}, status=HTTP_400_BAD_REQUEST)
@action(detail=False, methods=['post'], permission_classes=[IsAuthenticated])
def change_password(self, request):
data = request.data
required_fields = ['old_password', 'password', 'confirm_password']
for field in required_fields:
if not data.get(field):
return Response({'detail': f'"{field}" not provided', 'success': False}, status=HTTP_400_BAD_REQUEST)
if data.get('password') != data.get('confirm_password'):
return Response({'detail': 'Passwords do not match', 'success': False}, status=HTTP_400_BAD_REQUEST)
user = request.user
if not user.check_password(data.get('old_password')):
return Response({'detail': 'Old password is incorrect', 'success': False}, status=HTTP_401_UNAUTHORIZED)
user.set_password(data.get('password'))
user.save()
return Response({'detail': 'Password changed successfully', 'success': True}, status=HTTP_200_OK)
class OrganizationViewSet(ModelViewSet):
queryset = Organization.objects.all()
serializer_class = OrganizationSerializer
permission_classes = [IsAuthenticated]
lookup_field = 'uuid'
def get_permissions(self):
permissions = super().get_permissions()
if self.action in ['retrieve', 'update', 'partial_update', 'destroy', 'leave', 'list_members', 'remove_member']:
return [*permissions, IsOrganizationOwnerOrMember()]
return permissions
def get_queryset(self):
return Organization.objects.filter(
Q(owner=self.request.user) | Q(members=self.request.user)
).distinct()
def perform_create(self, serializer):
organization = serializer.save(owner=self.request.user)
organization.members.add(self.request.user)
def update(self, request, *args, **kwargs):
return super().update(request, *args, **kwargs)
def destroy(self, request, *args, **kwargs):
return super().destroy(request, *args, **kwargs)
@action(detail=True, methods=['post'], url_path='leave')
def leave(self, request, uuid=None):
organization: Organization = self.get_object()
if organization.owner == request.user:
other_members = organization.members.exclude(uuid=request.user.uuid)
if other_members.exists():
return Response(
{
'error': (
'Owner cannot leave while other members/managers exist. '
'Remove all other members first.'
)
},
status=HTTP_400_BAD_REQUEST,
)
organization.delete()
return Response({'message': 'Organization deleted. Owner has left successfully.'}, status=HTTP_200_OK)
if not organization.members.filter(uuid=request.user.uuid).exists():
return Response({'error': 'Not a member'}, status=HTTP_400_BAD_REQUEST)
roles = Role.objects.filter(organization=organization, members=request.user)
for role in roles:
role.members.remove(request.user)
organization.members.remove(request.user)
return Response({'message': 'Left organization'}, status=HTTP_200_OK)
@action(detail=True, methods=['get'], url_path='members')
def list_members(self, request, uuid=None):
organization = self.get_object()
serializer = UserSerializer(organization.members.all(), many=True)
return Response(serializer.data)
@action(detail=True, methods=['post'], url_path=r'member/(?P<user_uuid>[0-9a-f-]{36})/remove')
def remove_member(self, request, uuid=None, user_uuid=None):
organization: Organization = self.get_object()
if str(organization.owner.uuid) == str(user_uuid):
return Response({'error': 'Cannot remove owner'}, status=HTTP_403_FORBIDDEN)
user_to_remove = organization.members.filter(uuid=user_uuid).first()
if not user_to_remove:
return Response({'error': 'Not found'}, status=HTTP_404_NOT_FOUND)
organization.members.remove(user_to_remove)
return Response({'message': 'Removed'}, status=HTTP_200_OK)
class InviteViewSet(ModelViewSet):
queryset = Invite.objects.all()
serializer_class = InviteSerializer
permission_classes = [IsAuthenticated]
lookup_field = 'uuid'
def _get_organization_uuid(self, request):
organization_uuid = request.query_params.get('organization_uuid')
if organization_uuid in (None, ''):
organization_uuid = request.data.get('organization_uuid')
return organization_uuid
def get_permissions(self):
permissions = super().get_permissions()
if self.action in ['destroy']:
return [*permissions, CanManageOrganization()]
return permissions
def get_queryset(self):
user = self.request.user
queryset = Invite.objects.filter(
Q(organization__owner=user) | Q(organization__members=user)
).distinct().order_by('-created_at')
organization_uuid = self._get_organization_uuid(self.request)
if organization_uuid:
queryset = queryset.filter(organization__uuid=organization_uuid)
return queryset
def create(self, request, *args, **kwargs):
organization_uuid = self._get_organization_uuid(request)
if not organization_uuid:
raise ValidationError({'organization_uuid': 'organization_uuid is required'})
organization = Organization.objects.filter(uuid=organization_uuid).filter(
Q(owner=request.user) | Q(members=request.user)
).first()
if not organization:
raise NotFound('Organization not found')
if not can_manage_organization(request.user, organization):
raise PermissionDenied('Only organization owner or managers can create invites')
max_uses = request.query_params.get('max_uses') or request.data.get('max_uses', 1)
try:
max_uses = int(max_uses)
except (TypeError, ValueError):
raise ValidationError({'max_uses': 'max_uses must be an integer'})
if max_uses < 1 or max_uses > 1000:
raise ValidationError({'max_uses': 'max_uses must be between 1 and 1000'})
invitation = Invite.objects.create(
organization=organization,
created_by=request.user,
max_uses=max_uses,
)
serializer = self.get_serializer(invitation)
return Response(serializer.data, status=HTTP_201_CREATED)
def destroy(self, request, *args, **kwargs):
invite = self.get_object()
invite.is_active = False
invite.save(update_fields=['is_active', 'updated_at'])
return Response({'message': 'Invitation successfully revoked'}, status=HTTP_200_OK)
@action(detail=False, methods=['post'], url_path='join')
def join(self, request):
invite_uuid = request.query_params.get('invite_uuid') or request.data.get('invite_uuid')
if not invite_uuid:
return Response({'error': 'invite_uuid is required'}, status=HTTP_400_BAD_REQUEST)
with transaction.atomic():
try:
invitation = Invite.objects.select_for_update().select_related('organization').get(uuid=invite_uuid)
except Invite.DoesNotExist:
return Response({'error': 'Not Found'}, status=HTTP_404_NOT_FOUND)
if not invitation.is_valid():
return Response({'error': 'Invalid or expired invitation'}, status=HTTP_400_BAD_REQUEST)
organization = invitation.organization
if organization.members.filter(uuid=request.user.uuid).exists():
return Response({'error': 'Already a member'}, status=HTTP_403_FORBIDDEN)
organization.members.add(request.user)
invitation.uses += 1
if invitation.uses >= invitation.max_uses:
invitation.is_active = False
invitation.save(update_fields=['uses', 'is_active', 'updated_at'])
return Response({
'message': 'Joined',
'organization': OrganizationSerializer(organization).data,
}, status=HTTP_200_OK)
class RoleViewSet(ModelViewSet):
queryset = Role.objects.all()
serializer_class = RoleSerializer
permission_classes = [IsAuthenticated]
lookup_field = 'uuid'
def _get_organization_uuid(self, request):
organization_uuid = request.query_params.get('organization_uuid')
if organization_uuid in (None, ''):
organization_uuid = request.data.get('organization_uuid')
return organization_uuid
def get_permissions(self):
permissions = super().get_permissions()
if self.action in ['destroy']:
return [*permissions, CanManageOrganization()]
return permissions
def get_queryset(self):
user = self.request.user
queryset = Role.objects.filter(
Q(organization__owner=user) | Q(organization__members=user)
).distinct().order_by('name')
organization_uuid = self._get_organization_uuid(self.request)
if organization_uuid:
queryset = queryset.filter(organization__uuid=organization_uuid)
return queryset
def create(self, request, *args, **kwargs):
organization_uuid = self._get_organization_uuid(request)
if not organization_uuid:
raise ValidationError({'organization_uuid': 'organization_uuid is required'})
organization = Organization.objects.filter(uuid=organization_uuid).filter(
Q(owner=request.user) | Q(members=request.user)
).first()
if not organization:
raise NotFound('Organization not found')
if not can_manage_organization(request.user, organization):
raise PermissionDenied('Only organization owner or managers can create roles')
name = (request.data.get('name') or '').strip()
description = (request.data.get('description') or '').strip()
if not name:
raise ValidationError({'name': 'Role name is required'})
if organization.roles.filter(name__iexact=name).exists():
raise ValidationError({'name': 'A role with this name already exists in this organization'})
role = Role.objects.create(name=name, description=description, organization=organization)
serializer = self.get_serializer(role)
return Response(serializer.data, status=HTTP_201_CREATED)
def destroy(self, request, *args, **kwargs):
role = self.get_object()
role.delete()
return Response(status=HTTP_204_NO_CONTENT)
@action(detail=False, methods=['get'], url_path='mine')
def mine(self, request):
roles = Role.objects.filter(members=request.user).distinct()
serializer = self.get_serializer(roles, many=True)
return Response(serializer.data)
@action(detail=True, methods=['post'], url_path='join')
def join(self, request, uuid=None):
role = self.get_object()
organization = role.organization
is_owner = organization.owner == request.user
is_member = organization.members.filter(id=request.user.id).exists()
if not (is_owner or is_member):
raise PermissionDenied('Not a member of this organization')
if role.members.filter(id=request.user.id).exists():
return Response({'message': 'Already a member of this role'}, status=HTTP_200_OK)
role.members.add(request.user)
return Response({'message': 'Joined role successfully'}, status=HTTP_200_OK)