Dynavera/docs/orchestration-pseudocode.md
2026-03-23 15:02:03 +00:00

6.5 KiB
Raw Permalink Blame History

Orchestration Pseudocode

This document provides pseudocode for the core runtime components of Dynavera. Source references point to the submitted repository.


1. Multi-Turn Orchestration Loop

Source: apps/onboarding/consumers/base.py:77132

The orchestrate method is the central inference loop. It accumulates a message history, calls the GPU inference endpoint with MCP tool definitions attached, handles any tool calls the model requests, and only returns once the model produces a final text response (and the minimum-turn threshold has been met).

function ORCHESTRATE(message, config, min_turns, max_turns):
    messages ← [ {role: system,  content: config.system_prompt},
                  {role: user,    content: message} ]

    for turn = 1 to max_turns do
        emit THOUGHT status to WebSocket client

        response ← POST /v1/chat/completions {
            messages:    messages,
            tools:       MCP_ROUTER.get_tool_definitions(),
            tool_choice: "auto",
            max_tokens:  resolved_max_tokens
        }

        ai_msg ← response.choices[0].message
        append ai_msg to messages

        if ai_msg contains tool_calls then
            for each call in ai_msg.tool_calls do
                emit TOOL_START {name, args} to client
                result ← MCP_ROUTER.handle(call.name, call.args)
                emit TOOL_RESULT {result} to client
                append {role: tool, name: call.name, content: result} to messages
            end for
            continue                              // re-enter loop with updated context

        else                                     // model returned a text response
            content ← censor(ai_msg.content)
            if turn < min_turns then
                append force_reasoning_prompt to messages
                continue                         // force at least one reasoning pass
            end if
            return content
        end if
    end for

    return last_content                          // fallback if max_turns reached

Key design points:

  • Tool results are injected back into the message history before the next inference call, allowing the model to reason over retrieved evidence.
  • min_turns enforces at least one structured reasoning pass before returning, improving output quality on complex generation tasks.
  • All status events (THOUGHT, TOOL_START, TOOL_RESULT, COMPLETED) are streamed to the client over the WebSocket, making the reasoning process inspectable in the UI.

2. MCP Tool Dispatch

Source: apps/onboarding/mcp.py:42127

The MCPRouter exposes a fixed set of approved tools to the model. Tool definitions are generated at class load time from method-level @mcp_tool decorator metadata.

function MCP_ROUTER.handle(tool_name, args):
    method ← tool_name_to_method_map[tool_name]
    if method is None then
        return {error: "Tool not found"}
    end if

    try
        return await method(args)
    catch Exception as e
        return {error: e.message}
    end try

// search_knowledge (lines 78127)
function search_knowledge(args):
    query_vector ← POST /v1/embeddings {input: args.query}
    chunks ← SELECT content, metadata
              FROM KnowledgeChunk
              WHERE organization = role.organization
                AND (role = args.role_uuid OR role IS NULL)
                AND is_active = true
              ORDER BY CosineDistance(embedding, query_vector) ASC
              LIMIT 5
    return [{content, source, relevance: 1 - distance} for chunk in chunks]

// update_progress (lines 129159)
function update_progress(args):
    session ← OnboardingSession.get(uuid=args.session_uuid)
    if args.score     → session.state.last_score       ← args.score
    if args.completed → session.state.completed_modules ← append(args.completed_module)
    session.save()
    return {status: "success", new_state: session.state}

3. Knowledge Ingestion Pipeline

Source: apps/knowledge/tasks.py:45117

task ingest_training_file(file_uuid):
    file ← TrainingFile.get(uuid=file_uuid)
    file.status ← "ingesting";  file.save()

    raw_text ← extract_text(file)            // PDF / DOCX / TXT

    all_chunks ← []
    for segment in split(raw_text, size=CHUNK_SIZE) do
        response ← POST /v1/semantic-chunk {
            text:      segment,
            threshold: SEMANTIC_CHUNK_THRESHOLD
        }
        for (chunk_text, embedding) in zip(response.chunks, response.embeddings) do
            all_chunks.append(KnowledgeChunk {
                content:   chunk_text,
                embedding: embedding,         // 768-dim vector
                role:      file.role,
                metadata:  {source: file.file_name}
            })
        end for
    end for

    new_chunks ← [c for c in all_chunks if c.hash not in existing_hashes]
    KnowledgeChunk.bulk_create(new_chunks)

    file.status ← "embedded";  file.save()
    trigger update_agent_prompts_from_file(file.role.uuid)

4. Onboarding Generation Pipeline (CA → KA → AA)

Source: apps/onboarding/consumers/generate.py:34124

function run_pipeline(role):
    // Phase 1 — Curriculum Agent
    context ← search_knowledge(role, query=role.name + " responsibilities")
    topics  ← ORCHESTRATE(curriculum_generation_prompt(role, context), CA_config)
              → parsed as JSON list of topic strings (max 15)

    // Phase 2 — Knowledge Agent (one pass per topic)
    full_structure ← []
    for each topic in topics do
        hits    ← search_knowledge(role, query=topic)
        content ← ORCHESTRATE(knowledge_generation_prompt(topic, hits), KA_config,
                               min_turns=2, max_tokens=3500)
        full_structure.append({title: topic, body: content})
    end for

    // Phase 3 — Assessment Agent
    quiz_fields ← ORCHESTRATE(quiz_generation_prompt(topics, module_briefs), AA_config)
                 → sanitised and validated; fallback quiz generated if JSON invalid

    full_structure.append({title: "Final Assessment Quiz", fields: quiz_fields,
                            meta: {pass_mark: 80}})

    OnboardingFlow.save(role, full_structure)
    emit COMPLETED to client

Grading strategy:

  • Multiple-choice questions: deterministic string comparison against correct_option
  • Free-text / textarea responses: agent-graded by the AA at session completion
  • Per-question outcomes persisted in session state for audit and feedback rendering