from django.db.models import Q from channels.db import database_sync_to_async from apps.accounts.models import Role from apps.onboarding.consumers.base import BaseOnboardingConsumer, LogType from apps.onboarding.consumers.prompts import OnboardingPrompts from apps.onboarding.models import OnboardingFlow, OnboardingSession __all__ = ["OnboardingProgressConsumer"] class OnboardingProgressConsumer(BaseOnboardingConsumer): """ Route: /ws/onboarding/progress//// """ role_uuid: str flow_uuid: str target_user_uuid: str def parse_extra(self): self.role_uuid = self.scope["url_route"]["kwargs"].get("role_uuid") self.flow_uuid = self.scope["url_route"]["kwargs"].get("flow_uuid") self.target_user_uuid = self.scope["url_route"]["kwargs"].get("user_uuid") async def action_progress_monitor(self, data: dict): """ Analyzes the learner's progress and provides strengths, gaps, and next actions. """ if not await self.can_access_role(self.role_uuid): return await self.send_error("Forbidden: You do not have access to this role.") target_user_id = self.user.id if self.target_user_uuid and str(self.target_user_uuid) != str(self.user.uuid): target_user_id = await self.resolve_target_user_id(self.role_uuid, self.user.id, self.target_user_uuid) if not target_user_id: return await self.send_error("Forbidden: You cannot monitor this user.") await self.send_log(LogType.STATUS, "Progress Monitor is analyzing your onboarding progress...", "monitor") monitor_config = await self.get_config_by_type(self.role_uuid, 'monitor') if not monitor_config: return await self.send_error("Missing Progress Monitor AgentConfig for this role") progress_context = await self.get_role_progress_context( self.role_uuid, target_user_id, flow_uuid=self.flow_uuid, ) feedback = await self.stream_llm( monitor_config, OnboardingPrompts.progress_monitoring_prompt(progress_context), max_tokens=640, ) await self.send_log(LogType.COMPLETED, "Progress analysis complete.", { "role_uuid": self.role_uuid, "feedback": feedback, "status": progress_context.get("latest_status", "unknown"), "user_id": target_user_id, "flow_uuid": self.flow_uuid, "is_completed": progress_context.get("is_completed", False), }) ### Database Helpers ### @database_sync_to_async def get_role_progress_context(self, role_uuid, user_id, flow_uuid=None): role = Role.objects.get(uuid=role_uuid) active_flow = OnboardingFlow.objects.filter(role=role, is_active=True).order_by('-updated_at').first() scoped_flow = OnboardingFlow.objects.filter(role=role, uuid=flow_uuid).first() if flow_uuid else None sessions = OnboardingSession.objects.filter(user_id=user_id, role=role).order_by('-updated_at') if flow_uuid: sessions = sessions.filter(Q(flow__uuid=flow_uuid) | Q(flow__isnull=True, state__flow_uuid=str(flow_uuid))) latest_session = sessions.first() if not latest_session: return { "role_uuid": str(role.uuid), "role_name": role.name, "latest_status": "not_started", "flow_uuid": str((scoped_flow or active_flow).uuid) if (scoped_flow or active_flow) else None, "is_completed": False, "progress": 0, } state = latest_session.state or {} flow_for_context = latest_session.flow or scoped_flow or active_flow structure = flow_for_context.structure if flow_for_context and isinstance(flow_for_context.structure, list) else [] # Find the Final Quiz page for grading details quiz_page = next((p for p in structure if isinstance(p, dict) and p.get("meta", {}).get("page_type") == "final_quiz"), None) if quiz_page is None and structure: quiz_page = structure[-1] quiz_page_uuid = str(quiz_page.get("uuid") or "") if quiz_page else "" responses = state.get("responses", {}) quiz_responses = responses.get(quiz_page_uuid, {}) if isinstance(responses, dict) else {} final_quiz_result = state.get("final_quiz_result", {}) grading_details = final_quiz_result.get("grading_details", []) if isinstance(final_quiz_result, dict) else [] # Build QA map for the AI final_quiz_qa = [] if quiz_page: for field in quiz_page.get("fields", []): key = field.get("key") detail = next((d for d in grading_details if d.get("key") == key), {}) final_quiz_qa.append({ "label": field.get("label"), "answer": quiz_responses.get(key), "marked_correct": detail.get("correct"), "reason": detail.get("reason", "") }) return { "role_name": role.name, "latest_status": latest_session.status, "progress": state.get("progress_percentage", 0), "completed_modules": state.get("completed_modules", []), "is_completed": latest_session.status == 'completed', "final_quiz_qa": final_quiz_qa, } @database_sync_to_async def resolve_target_user_id(self, role_uuid, requester_id, target_user_uuid): from apps.accounts.models import Role, User role = Role.objects.filter(uuid=role_uuid).first() requester = User.objects.filter(id=requester_id).first() target = User.objects.filter(uuid=target_user_uuid).first() if not all([role, requester, target]): return None # Check if requester is Owner or Manager is_privileged = (role.organization.owner.id == requester_id or (bool(requester.is_manager) and role.organization.members.filter(id=requester_id).exists())) if is_privileged and role.members.filter(id=target.id).exists(): return target.id return None