import datetime import json import statistics import time from pathlib import Path import httpx from django.conf import settings from django.core.management.base import BaseCommand from django.db.models import Q from pgvector.django import CosineDistance from apps.accounts.models import Organization, Role, User from apps.knowledge.models import KnowledgeChunk, TrainingFile from apps.onboarding.models import OnboardingSession class Command(BaseCommand): help = "Benchmark Dynavera system components: GPU inference server, pgvector retrieval, and database." def add_arguments(self, parser): parser.add_argument("--runs", type=int, default=5, help="Repetitions per latency benchmark (default: 5)") parser.add_argument("--out", type=str, default="benchmarks", help="Output directory for the results file (default: benchmarks/)") parser.add_argument("--skip-llm", action="store_true", help="Skip LLM inference benchmarks (each prompt takes ~30 s)") def handle(self, *args, **options): self.runs = options["runs"] self.skip_llm = options["skip_llm"] self.out_dir = Path(options["out"]) self.out_dir.mkdir(exist_ok=True) self.results = {} self.stdout.write(self.style.SUCCESS("\n=== Dynavera System Benchmark ===")) self.stdout.write(f" Inference endpoint : {settings.INFERENCE_URL}") self.stdout.write(f" Repetitions : {self.runs}") self.stdout.write(f" LLM benchmarks : {'SKIPPED (--skip-llm)' if self.skip_llm else 'ENABLED'}\n") self._bench_health() self._bench_embeddings() self._bench_chunking() if not self.skip_llm: self._bench_llm() self._bench_database() self._bench_retrieval() self._print_summary() self._save_report() def _req(self, method, path, **kwargs): url = f"{settings.INFERENCE_URL}{path}" resp = httpx.request(method, url, auth=settings.INFERENCE_AUTH, timeout=180, **kwargs) resp.raise_for_status() return resp.json() def _time_fn(self, fn): t0 = time.perf_counter() result = fn() return result, (time.perf_counter() - t0) * 1000 def _stats(self, times_ms): s = sorted(times_ms) n = len(s) p95_idx = min(n - 1, int(-(-(0.95 * n) // 1)) - 1) return { "mean_ms": round(statistics.mean(s), 1), "median_ms": round(statistics.median(s), 1), "p95_ms": round(s[p95_idx], 1), "min_ms": round(s[0], 1), "max_ms": round(s[-1], 1), } def _bench_health(self): self.stdout.write("[ 1/6 ] GPU server health check ...") try: data, ms = self._time_fn(lambda: self._req("GET", "/health")) ok = data.get("status") == "ok" self.results["health"] = { "status": "OK" if ok else "DEGRADED", "llm_ready": data.get("llm_ready", False), "embed_ready": data.get("embedding_ready", False), "latency_ms": round(ms, 1), } h = self.results["health"] self.stdout.write( f" {h['status']} | LLM: {'ready' if h['llm_ready'] else 'unloaded'} " f"| Embed: {'ready' if h['embed_ready'] else 'not ready'} | {ms:.0f} ms" ) except Exception as exc: self.results["health"] = {"status": "ERROR", "error": str(exc)} self.stdout.write(self.style.ERROR(f" FAILED: {exc}")) def _bench_embeddings(self): self.stdout.write(f"\n[ 2/6 ] Embedding latency ({self.runs} runs × 3 query lengths) ...") queries = { "short ": "What is onboarding?", "medium ": ( "Explain the process for configuring access control policies for a new software engineer " "joining the platform team, including approval workflows and tool provisioning steps." ), "long ": ( "A new hire on the infrastructure team needs to understand our CI/CD pipeline, deployment " "procedures, incident response protocols, monitoring dashboards, on-call rotation policy, " "and how to request access to production systems. Provide a comprehensive overview of all " "these areas including the relevant tools, key contacts, and escalation procedures they " "should be aware of during their first week and first month at the company." ), } embed_results = {} for label, query in queries.items(): times = [] for _ in range(self.runs): _, ms = self._time_fn(lambda q=query: self._req("POST", "/v1/embeddings", json={"input": q})) times.append(ms) st = self._stats(times) embed_results[label.strip()] = {"query_chars": len(query), **st} self.stdout.write( f" {label}({len(query):4d} chars) mean={st['mean_ms']:.0f} ms " f"p95={st['p95_ms']:.0f} ms min={st['min_ms']:.0f} ms max={st['max_ms']:.0f} ms" ) self.results["embeddings"] = embed_results def _bench_chunking(self): self.stdout.write("\n[ 3/6 ] Semantic chunking latency ...") texts = { "small (~200 c)": "a " * 100, "medium (~2k c) ": ( "This section covers the onboarding process for new employees joining the engineering team. " "You will learn about code review practices, deployment procedures, incident response, and " "team communication protocols. Each topic is covered in depth with examples and references " "to internal documentation. All engineers are expected to complete this module in week one. " ) * 5, "large (~8k c) ": ( "The infrastructure team manages all cloud resources, CI/CD pipelines, and production environments. " "New members are expected to understand Kubernetes cluster management, Terraform IaC, " "GitLab CI pipeline authoring, monitoring with Grafana and Prometheus, and incident response procedures. " "This document provides a comprehensive guide to each area including runbooks and escalation paths. " ) * 20, } chunk_results = {} for label, text in texts.items(): try: result, ms = self._time_fn(lambda t=text: self._req("POST", "/v1/semantic-chunk", json={"text": t})) n = len(result.get("chunks", [])) chunk_results[label.strip()] = {"chars": len(text), "chunks_produced": n, "latency_ms": round(ms, 1)} self.stdout.write(f" {label} → {n} chunks | {ms:.0f} ms") except Exception as exc: chunk_results[label.strip()] = {"error": str(exc)} self.stdout.write(self.style.ERROR(f" {label} FAILED: {exc}")) self.results["chunking"] = chunk_results def _bench_llm(self): self.stdout.write("\n[ 4/6 ] LLM inference latency (each prompt is a single non-streaming call) ...") prompts = [ { "label": "short_qa", "system": "You are an onboarding assistant.", "user": "What does a Kubernetes pod do? Answer in 2 sentences.", "max_tokens": 128, }, { "label": "progress_summary", "system": "You are an onboarding assistant.", "user": ( "A trainee has completed: Git Basics, CI/CD Pipelines, Code Review. Score: 85%. " "Write a 2-sentence progress summary." ), "max_tokens": 128, }, { "label": "curriculum_gen", "system": "You are an onboarding assistant. Output only a valid JSON array of strings.", "user": ( "Create a 6-module onboarding curriculum for a Software Engineer role focused on " "backend services. Output ONLY a JSON array of module title strings." ), "max_tokens": 256, }, { "label": "assessment_gen", "system": "You are an onboarding assistant. Output only valid JSON.", "user": ( "Generate 3 multiple-choice questions to assess understanding of CI/CD pipelines. " "Output as a JSON array of objects with keys: question, options (array of 4), answer." ), "max_tokens": 512, }, { "label": "knowledge_explanation", "system": "You are an onboarding assistant.", "user": ( "Explain Git branching strategy best practices for a new engineer. " "Cover: feature branches, naming conventions, merge vs rebase, and PR workflow. " "Use clear headings and bullet points. Target ~400 words." ), "max_tokens": 700, }, ] llm_results = {} for p in prompts: self.stdout.write(f" {p['label']} (max_tokens={p['max_tokens']}) ...", ending="") self.stdout.flush() try: t0 = time.perf_counter() data = self._req( "POST", "/v1/chat/completions", json={ "messages": [ {"role": "system", "content": p["system"]}, {"role": "user", "content": p["user"]}, ], "max_tokens": p["max_tokens"], "stream": False, }, ) elapsed_s = time.perf_counter() - t0 usage = data.get("usage", {}) ct = usage.get("completion_tokens", 0) pt = usage.get("prompt_tokens", 0) tps = round(ct / elapsed_s, 1) if elapsed_s > 0 and ct > 0 else 0 preview = (data["choices"][0]["message"]["content"] or "")[:100].replace("\n", " ") llm_results[p["label"]] = { "elapsed_s": round(elapsed_s, 2), "prompt_tokens": pt, "completion_tokens": ct, "tokens_per_sec": tps, "response_preview": preview, } self.stdout.write(f" {elapsed_s:.1f} s | {ct} tokens | {tps} tok/s") except Exception as exc: llm_results[p["label"]] = {"error": str(exc)} self.stdout.write(self.style.ERROR(f" FAILED: {exc}")) self.results["llm"] = llm_results def _bench_database(self): self.stdout.write("\n[ 5/6 ] Database statistics ...") try: from django.db import connection with connection.cursor() as cur: cur.execute("SELECT 1 FROM knowledge_knowledgechunk LIMIT 1") except Exception: self.stdout.write(self.style.WARNING(" Tables missing — run 'manage.py migrate' first. Skipping.")) self.results["database"] = {"skipped": "Migrations not applied."} return try: self.results["database"] = { "organizations": Organization.objects.count(), "roles": Role.objects.count(), "users": User.objects.count(), "training_files_total": TrainingFile.objects.count(), "training_files_embedded": TrainingFile.objects.filter(status="embedded").count(), "knowledge_chunks_with_embeddings": KnowledgeChunk.objects.filter(embedding__isnull=False, is_active=True).count(), "onboarding_sessions": OnboardingSession.objects.count(), } d = self.results["database"] self.stdout.write(f" Orgs: {d['organizations']} | Roles: {d['roles']} | Users: {d['users']}") self.stdout.write(f" Training files: {d['training_files_total']} total ({d['training_files_embedded']} embedded)") self.stdout.write(f" Knowledge chunks (with embeddings): {d['knowledge_chunks_with_embeddings']}") self.stdout.write(f" Onboarding sessions: {d['onboarding_sessions']}") except Exception as exc: self.results["database"] = {"error": str(exc)} self.stdout.write(self.style.ERROR(f" FAILED: {exc}")) def _bench_retrieval(self): self.stdout.write(f"\n[ 6/6 ] pgvector retrieval latency ({self.runs} runs × top-k ∈ [5, 10, 20]) ...") try: role = Role.objects.filter(knowledge_chunks__embedding__isnull=False).distinct().first() except Exception as exc: self.stdout.write(self.style.WARNING(f" DB not ready ({exc}). Skipping.")) self.results["retrieval"] = {"skipped": str(exc)} return if role is None: self.stdout.write(self.style.WARNING(" No role with embedded chunks — skipping.")) self.results["retrieval"] = {"skipped": "No embedded chunks found in database."} return query = "What are the key responsibilities, tools, and procedures for this role?" self.stdout.write(f" Role: {role.name} (org: {role.organization.name})") self.stdout.write(f" Query: \"{query}\"") try: embed_data = self._req("POST", "/v1/embeddings", json={"input": query}) query_vector = embed_data["data"][0]["embedding"] except Exception as exc: self.results["retrieval"] = {"error": f"Could not generate query embedding: {exc}"} self.stdout.write(self.style.ERROR(f" FAILED to get embedding: {exc}")) return total_chunks = KnowledgeChunk.objects.filter(embedding__isnull=False, is_active=True).count() retrieval_results = {} for top_k in [5, 10, 20]: times = [] n_returned = 0 for _ in range(self.runs): t0 = time.perf_counter() chunks = list( KnowledgeChunk.objects.filter( organization=role.organization, embedding__isnull=False, is_active=True, ).filter( Q(role=role) | Q(role__isnull=True) ).annotate( distance=CosineDistance("embedding", query_vector) ).order_by("distance")[:top_k] ) times.append((time.perf_counter() - t0) * 1000) n_returned = len(chunks) st = self._stats(times) retrieval_results[f"top_{top_k}"] = {"results_returned": n_returned, **st} self.stdout.write( f" top-{top_k:2d}: mean={st['mean_ms']:.1f} ms " f"p95={st['p95_ms']:.1f} ms min={st['min_ms']:.1f} ms max={st['max_ms']:.1f} ms" ) self.results["retrieval"] = { "role": role.name, "organization": role.organization.name, "query": query, "total_chunks_in_db": total_chunks, "results": retrieval_results, } def _print_summary(self): self.stdout.write(self.style.SUCCESS("\n=== Summary ===\n")) h = self.results.get("health", {}) self.stdout.write(f" GPU Server : {h.get('status', 'N/A')} — LLM {'ready' if h.get('llm_ready') else 'unloaded'}, embed {'ready' if h.get('embed_ready') else 'N/A'}") emb = self.results.get("embeddings", {}) means = [v["mean_ms"] for v in emb.values() if "mean_ms" in v] if means: self.stdout.write(f" Embedding : {min(means):.0f}–{max(means):.0f} ms (mean across query lengths)") chnk = self.results.get("chunking", {}) lats = [v["latency_ms"] for v in chnk.values() if "latency_ms" in v] if lats: self.stdout.write(f" Chunking : {min(lats):.0f}–{max(lats):.0f} ms range by text size") llm = self.results.get("llm", {}) elapsed = [v["elapsed_s"] for v in llm.values() if "elapsed_s" in v] tps_all = [v["tokens_per_sec"] for v in llm.values() if "tokens_per_sec" in v and v["tokens_per_sec"] > 0] if elapsed: self.stdout.write( f" LLM inference : {min(elapsed):.1f}–{max(elapsed):.1f} s range" + (f" | {statistics.mean(tps_all):.1f} tok/s avg" if tps_all else "") ) ret = self.results.get("retrieval", {}) r5 = ret.get("results", {}).get("top_5", {}) if r5.get("mean_ms"): self.stdout.write(f" RAG retrieval : {r5['mean_ms']:.1f} ms mean (top-5, {ret.get('total_chunks_in_db', '?')} total chunks)") db = self.results.get("database", {}) if "knowledge_chunks_with_embeddings" in db: self.stdout.write( f" Knowledge base : {db['knowledge_chunks_with_embeddings']} chunks from " f"{db['training_files_embedded']} embedded files" ) def _save_report(self): ts = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") path = self.out_dir / f"results_{ts}.md" lines = [ "# Dynavera Benchmark Results", "", f"**Date:** {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ", f"**Inference endpoint:** `{settings.INFERENCE_URL}` ", f"**Repetitions per benchmark:** {self.runs} ", "", ] h = self.results.get("health", {}) lines += [ "## 1. GPU Server Health", "", "| Field | Value |", "|---|---|", f"| Status | {h.get('status', 'N/A')} |", f"| LLM Ready | {h.get('llm_ready', 'N/A')} |", f"| Embed Ready | {h.get('embed_ready', 'N/A')} |", f"| Health check RTT | {h.get('latency_ms', 'N/A')} ms |", "", ] emb = self.results.get("embeddings", {}) if emb: lines += [ "## 2. Embedding Latency", "", "| Query type | Chars | Mean (ms) | Median (ms) | P95 (ms) | Min (ms) | Max (ms) |", "|---|---|---|---|---|---|---|", ] for label, v in emb.items(): if "mean_ms" in v: lines.append(f"| {label} | {v['query_chars']} | {v['mean_ms']} | {v['median_ms']} | {v['p95_ms']} | {v['min_ms']} | {v['max_ms']} |") lines.append("") chnk = self.results.get("chunking", {}) if chnk: lines += [ "## 3. Semantic Chunking Latency", "", "| Input size | Chars | Chunks produced | Latency (ms) |", "|---|---|---|---|", ] for label, v in chnk.items(): if "latency_ms" in v: lines.append(f"| {label} | {v['chars']} | {v['chunks_produced']} | {v['latency_ms']} |") lines.append("") llm = self.results.get("llm", {}) if llm: lines += [ "## 4. LLM Inference Latency", "", "| Prompt type | Elapsed (s) | Prompt tokens | Completion tokens | Tok/s |", "|---|---|---|---|---|", ] for label, v in llm.items(): if "elapsed_s" in v: lines.append( f"| {label} | {v['elapsed_s']} | {v['prompt_tokens']} | {v['completion_tokens']} | {v['tokens_per_sec']} |" ) else: lines.append(f"| {label} | ERROR | — | — | — |") lines.append("") lines += [ "> **Note on end-to-end session time:** A full onboarding session invokes multiple sequential", "> inference calls (curriculum generation → knowledge explanation × N modules → assessment generation → progress summary).", "> Total wall-clock time accumulates across all turns plus retrieval and tool-call overhead.", "", ] db = self.results.get("database", {}) if db and "error" not in db: lines += [ "## 5. Database Statistics", "", "| Entity | Count |", "|---|---|", ] labels = { "organizations": "Organizations", "roles": "Roles", "users": "Users", "training_files_total": "Training Files (total)", "training_files_embedded": "Training Files (embedded)", "knowledge_chunks_with_embeddings": "Knowledge Chunks (with embeddings)", "onboarding_sessions": "Onboarding Sessions", } for key, label in labels.items(): if key in db: lines.append(f"| {label} | {db[key]} |") lines.append("") ret = self.results.get("retrieval", {}) if "results" in ret: lines += [ "## 6. pgvector Retrieval Latency", "", f"**Role:** {ret.get('role')} ", f"**Organisation:** {ret.get('organization')} ", f'**Query:** "{ret.get("query")}" ', f"**Total chunks in DB:** {ret.get('total_chunks_in_db')} ", "", "| Top-K | Results returned | Mean (ms) | Median (ms) | P95 (ms) | Min (ms) | Max (ms) |", "|---|---|---|---|---|---|---|", ] for k, v in ret["results"].items(): lines.append( f"| {k} | {v['results_returned']} | {v['mean_ms']} | {v['median_ms']} | {v['p95_ms']} | {v['min_ms']} | {v['max_ms']} |" ) lines.append("") lines += [ "## Raw JSON", "", "```json", json.dumps(self.results, indent=2, default=str), "```", "", ] path.write_text("\n".join(lines), encoding="utf-8") self.stdout.write(self.style.SUCCESS(f"\nResults saved → {path}"))