2026-03-18 00:37:38 +00:00
import json
import re
from uuid import uuid4
from channels . db import database_sync_to_async
from apps . accounts . models import Role
from apps . onboarding . consumers . base import BaseOnboardingConsumer , LogType
from apps . onboarding . consumers . prompts import OnboardingPrompts
from apps . onboarding . models import OnboardingFlow
__all__ = [ " OnboardingGenerateConsumer " ]
class OnboardingGenerateConsumer ( BaseOnboardingConsumer ) :
"""
Route : / ws / onboarding / generate / < role_uuid > /
"""
role_uuid : str
def parse_extra ( self ) :
self . role_uuid = self . scope [ " url_route " ] [ " kwargs " ] . get ( " role_uuid " )
async def action_start_full_onboarding ( self , data : dict ) - > None :
user_message = data . get ( " message " )
if user_message and not self . moderator . is_clean ( user_message ) :
return await self . send_error ( " Your message did not pass moderation. Please revise your input. " )
if not await self . can_access_role ( self . role_uuid ) :
return await self . send_error ( " You are not a member of this role or do not have permission to access it " )
await self . run_pipeline ( await self . get_role ( self . role_uuid ) )
async def run_pipeline ( self , role : Role ) - > None :
"""
Full orchestration pipeline for generating curriculum , knowledge content , and assessment quiz for a given role
"""
self . logger . info ( f " Starting onboarding generation pipeline for role= { role . name } (uuid= { role . uuid } ) by user= { self . user . full_name } " )
await self . send_log ( LogType . STATUS , f " Phase 1: Generating Curriculum... " , " curriculum " )
ca_config = await self . get_config_by_type ( role . uuid , ' curriculum ' )
if not ca_config :
return await self . send_error ( " Missing curriculum AgentConfig for this role " )
ca_response = await self . orchestrate ( OnboardingPrompts . curriculum_generation_prompt ( ) , ca_config , minimum_turns = 1 , max_tokens = 384 )
topics = self . _extract_json_list ( ca_response )
if not topics :
return await self . send_log ( LogType . ERROR , " Curriculum generation returned no topics " , f " Curriculum generation produced no topics for role= { role . name } (uuid= { role . uuid } ) " )
full_structure = [ ]
module_briefs = [ ]
for index , topic in enumerate ( topics ) :
await self . send_log ( LogType . STATUS , f " Phase 2: Researching { topic } ... " , " knowledge " )
ka_config = await self . get_config_by_type ( role . uuid , ' knowledge ' )
if not ka_config :
return await self . send_error ( " Missing knowledge AgentConfig for this role " )
knowledge_hits = await self . fetch_knowledge_context ( role . uuid , topic )
context_markdown = self . format_knowledge_context ( knowledge_hits )
ka_response = await self . orchestrate (
OnboardingPrompts . knowledge_generation_prompt ( topic , context_markdown ) ,
2026-03-18 20:07:24 +00:00
ka_config , minimum_turns = 2 , max_tokens = 3500
2026-03-18 00:37:38 +00:00
)
full_structure . append ( {
" title " : topic ,
" body " : ka_response ,
" order " : index ,
" fields " : [ ] ,
" meta " : {
" topic_index " : index ,
" table_of_contents " : [ str ( item ) for item in topics ] ,
} ,
} )
module_briefs . append ( {
" topic " : str ( topic ) ,
" summary_excerpt " : str ( ka_response ) [ : 1200 ] ,
} )
await self . send_log ( LogType . STATUS , " Phase 3: Creating final assessment quiz... " , " assessment " )
aa_config = await self . get_config_by_type ( role . uuid , ' assessment ' )
if not aa_config :
return await self . send_error ( " Missing assessment AgentConfig for this role " )
question_count = 8
try :
random_result = await self . router . handle_tool_call ( " random_int " , { " min " : 6 , " max " : 10 } )
if isinstance ( random_result , dict ) and isinstance ( random_result . get ( " value " ) , int ) :
question_count = int ( random_result [ " value " ] )
except Exception :
question_count = 8
quiz_response = await self . orchestrate (
OnboardingPrompts . quiz_generation_prompt ( question_count , module_briefs ) ,
aa_config ,
minimum_turns = 1 ,
max_tokens = 1600 ,
)
quiz_fields = self . _sanitize_quiz_fields ( self . _extract_json_list ( quiz_response ) )
if not quiz_fields :
await self . send_log ( LogType . STATUS , " Assessment output invalid, retrying quiz generation... " , " assessment " )
retry_response = await self . orchestrate (
OnboardingPrompts . quiz_generation_retry_prompt ( question_count , module_briefs ) ,
aa_config ,
minimum_turns = 1 ,
max_tokens = 1600 ,
)
quiz_fields = self . _sanitize_quiz_fields ( self . _extract_json_list ( retry_response ) )
if not quiz_fields :
await self . send_log ( LogType . STATUS , " Assessment output still invalid. Using fallback final quiz. " , " assessment " )
quiz_fields = self . _build_fallback_quiz_fields ( [ str ( topic ) for topic in topics ] , count = question_count )
full_structure . append ( {
" title " : " Final Assessment Quiz " ,
" body " : (
" ### Final Quiz \n "
" Answer all questions below. You need **80 % ** to complete onboarding. "
" You can update answers and submit when ready. "
) ,
" order " : len ( full_structure ) ,
" fields " : quiz_fields ,
" meta " : {
" page_type " : " final_quiz " ,
" pass_mark " : 80 ,
} ,
} )
await self . save_full_flow ( role . uuid , full_structure )
self . logger . info ( " Full onboarding generation completed: role_uuid= %s pages= %s " , role . uuid , len ( full_structure ) )
await self . send_log ( LogType . COMPLETED , " Onboarding pipeline complete and structure saved. " )
async def fetch_knowledge_context ( self , role_uuid , topic ) :
query = f " onboarding training content for { topic } "
await self . send_log ( LogType . TOOL_START , " Accessing knowledge base: search_knowledge... " , { " query " : query , " role_uuid " : str ( role_uuid ) } )
try :
result = await self . router . handle_tool_call (
" search_knowledge " ,
{
" query " : query ,
" role_uuid " : str ( role_uuid ) ,
} ,
)
await self . send_log ( LogType . TOOL_RESULT , f " Retrieved { len ( result ) if isinstance ( result , list ) else 0 } knowledge chunk(s) " , result )
return result if isinstance ( result , list ) else [ ]
except Exception as exc :
await self . send_log ( LogType . ERROR , f " Knowledge retrieval failed for topic ' { topic } ' : { str ( exc ) } " )
return [ ]
def _coerce_list_payload ( self , payload ) :
if isinstance ( payload , list ) :
return payload
if isinstance ( payload , dict ) :
for key in ( ' questions ' , ' items ' , ' fields ' , ' quiz ' ) :
value = payload . get ( key )
if isinstance ( value , list ) :
return value
return [ ]
def _extract_json_list ( self , text : str ) - > list :
"""
Extracts a JSON list from model output , tolerating wrappers and markdown fences
"""
if not text :
return [ ]
candidate_texts = [ str ( text ) . strip ( ) ]
for block in re . findall ( r ' ```(?:json)? \ s*([ \ s \ S]*?)``` ' , str ( text ) , re . IGNORECASE ) :
candidate_texts . append ( block . strip ( ) )
decoder = json . JSONDecoder ( )
for candidate in candidate_texts :
if not candidate :
continue
try :
parsed = json . loads ( candidate )
coerced = self . _coerce_list_payload ( parsed )
if coerced :
return coerced
except Exception :
pass
for idx , char in enumerate ( candidate ) :
if char not in ' [ { ' :
continue
try :
parsed , _ = decoder . raw_decode ( candidate [ idx : ] )
except Exception :
continue
coerced = self . _coerce_list_payload ( parsed )
if coerced :
return coerced
return [ ]
def format_knowledge_context ( self , knowledge_hits ) :
if not knowledge_hits :
return " No indexed MCP documents found for this role/topic. "
lines = [ ]
for idx , item in enumerate ( knowledge_hits [ : 5 ] ) :
source = item . get ( " source " , " Unknown Source " ) if isinstance ( item , dict ) else " Unknown Source "
relevance = item . get ( " relevance " ) if isinstance ( item , dict ) else None
content = item . get ( " content " , " " ) if isinstance ( item , dict ) else " "
safe_content = str ( content ) . strip ( ) [ : 1600 ]
lines . append (
f " [ { idx + 1 } ] Source: { source } | Relevance: { relevance } \n { safe_content } "
)
return " \n \n " . join ( lines )
def _sanitize_quiz_fields ( self , raw_fields ) :
sanitized = [ ]
seen_keys = set ( )
for index , field in enumerate ( raw_fields or [ ] ) :
if not isinstance ( field , dict ) :
continue
key = str ( field . get ( ' key ' ) or f ' final_quiz_q_ { index + 1 } ' ) . strip ( ) . lower ( ) . replace ( ' ' , ' _ ' )
if not key :
key = f ' final_quiz_q_ { index + 1 } '
if key in seen_keys :
key = f ' { key } _ { index + 1 } '
seen_keys . add ( key )
label = str ( field . get ( ' label ' ) or ' ' ) . strip ( )
if not label :
continue
field_type = str ( field . get ( ' field_type ' ) or ' select ' ) . strip ( ) . lower ( )
if field_type not in ( ' select ' , ' text ' , ' textarea ' ) :
field_type = ' select '
raw_options = field . get ( ' options ' ) if isinstance ( field . get ( ' options ' ) , list ) else [ ]
options = [ ]
for option in raw_options :
option_text = str ( option ) . strip ( )
if option_text and option_text not in options :
options . append ( option_text )
validation = field . get ( ' validation ' ) if isinstance ( field . get ( ' validation ' ) , dict ) else { }
if field_type == ' select ' :
if len ( options ) < 2 :
continue
correct_option = str ( validation . get ( ' correct_option ' ) or ' ' ) . strip ( )
if correct_option not in options :
correct_option = options [ 0 ]
sanitized . append ( {
' key ' : key ,
' label ' : label ,
' field_type ' : ' select ' ,
' options ' : options [ : 5 ] ,
' required ' : True ,
' validation ' : {
' correct_option ' : correct_option ,
' explanation ' : str ( validation . get ( ' explanation ' ) or ' ' ) ,
} ,
} )
continue
accepted_answers_raw = validation . get ( ' accepted_answers ' )
if isinstance ( accepted_answers_raw , list ) :
accepted_answers = [ str ( item ) . strip ( ) for item in accepted_answers_raw if str ( item ) . strip ( ) ]
else :
accepted_single = str ( validation . get ( ' correct_answer ' ) or ' ' ) . strip ( )
accepted_answers = [ accepted_single ] if accepted_single else [ ]
if not accepted_answers :
continue
sanitized . append ( {
' key ' : key ,
' label ' : label ,
' field_type ' : ' textarea ' if field_type == ' textarea ' else ' text ' ,
' options ' : [ ] ,
' required ' : True ,
' validation ' : {
' accepted_answers ' : accepted_answers ,
' explanation ' : str ( validation . get ( ' explanation ' ) or ' ' ) ,
} ,
} )
return sanitized
def _build_fallback_quiz_fields ( self , topics , count = 8 ) :
safe_topics = [ str ( topic ) . strip ( ) for topic in ( topics or [ ] ) if str ( topic ) . strip ( ) ]
if not safe_topics :
safe_topics = [ ' onboarding fundamentals ' ]
fallback_fields = [ ]
for index in range ( count ) :
topic = safe_topics [ index % len ( safe_topics ) ]
key = f ' final_quiz_q_ { index + 1 } '
if index % 3 == 0 :
fallback_fields . append ( {
' key ' : key ,
' label ' : f " In one or two sentences, what is the safest approach when handling { topic } ? " ,
' field_type ' : ' textarea ' ,
' options ' : [ ] ,
' required ' : True ,
' validation ' : {
' accepted_answers ' : [
' best practices ' ,
' documentation ' ,
' quality ' ,
' compliance ' ,
] ,
' explanation ' : ' Good answers reference documented best practices, quality checks, and compliance. ' ,
} ,
} )
continue
correct = f " Use documented best practices for { topic } . "
options = [
correct ,
f " Skip review steps for { topic } to move faster. " ,
f " Rely only on assumptions instead of evidence for { topic } . " ,
f " Ignore quality and compliance checks in { topic } tasks. " ,
]
fallback_fields . append ( {
' key ' : key ,
' label ' : f " Which approach is most appropriate when working on { topic } ? " ,
' field_type ' : ' select ' ,
' options ' : options ,
' required ' : True ,
' validation ' : {
' correct_option ' : correct ,
' explanation ' : f " { correct } balances reliability, quality, and role expectations. " ,
} ,
} )
return fallback_fields
def _normalize_structure ( self , structure ) :
normalized_pages = [ ]
for index , page in enumerate ( structure or [ ] ) :
fields = [ ]
for field_index , field in enumerate ( page . get ( ' fields ' , [ ] ) if isinstance ( page , dict ) else [ ] ) :
if not isinstance ( field , dict ) :
continue
key = str ( field . get ( ' key ' ) or f ' field_ { field_index + 1 } ' )
raw_options = field . get ( ' options ' ) if isinstance ( field . get ( ' options ' ) , list ) else [ ]
options = [ str ( option ) for option in raw_options if str ( option ) . strip ( ) ]
validation = field . get ( ' validation ' ) if isinstance ( field . get ( ' validation ' ) , dict ) else { }
correct_option = validation . get ( ' correct_option ' )
if correct_option is not None :
correct_option = str ( correct_option )
normalized_validation = {
' correct_option ' : correct_option if correct_option in options else None ,
' accepted_answers ' : [
str ( item ) . strip ( )
for item in ( validation . get ( ' accepted_answers ' ) if isinstance ( validation . get ( ' accepted_answers ' ) , list ) else [ ] )
if str ( item ) . strip ( )
] ,
' explanation ' : str ( validation . get ( ' explanation ' ) or ' ' ) ,
}
fields . append ( {
' uuid ' : str ( uuid4 ( ) ) ,
' key ' : key ,
' label ' : str ( field . get ( ' label ' ) or key . replace ( ' _ ' , ' ' ) . title ( ) ) ,
' field_type ' : str ( field . get ( ' field_type ' ) or ' text ' ) ,
' required ' : bool ( field . get ( ' required ' , False ) ) ,
' options ' : options ,
' default_value ' : field . get ( ' default_value ' , ' ' ) ,
' validation ' : normalized_validation ,
} )
page_title = page . get ( ' title ' ) if isinstance ( page , dict ) else None
page_body = page . get ( ' body ' ) if isinstance ( page , dict ) else ' '
page_order = page . get ( ' order ' ) if isinstance ( page , dict ) else index
normalized_pages . append ( {
' uuid ' : str ( uuid4 ( ) ) ,
' title ' : str ( page_title or f ' Module { index + 1 } ' ) ,
' body ' : str ( page_body or ' ' ) ,
' order ' : int ( page_order if isinstance ( page_order , int ) else index ) ,
' fields ' : fields ,
' meta ' : page . get ( ' meta ' ) if isinstance ( page . get ( ' meta ' ) , dict ) else { } ,
} )
return normalized_pages
@database_sync_to_async
def save_full_flow ( self , role_uuid , structure ) :
""" Saves the final nested structure to the OnboardingFlow model. """
from apps . accounts . models import Role
role = Role . objects . get ( uuid = role_uuid )
normalized_structure = self . _normalize_structure ( structure )
flow , _ = OnboardingFlow . objects . update_or_create (
role = role ,
defaults = {
' title ' : f " AI Onboarding: { role . name } " ,
' structure ' : normalized_structure ,
' is_active ' : True
}
)
return flow