484 lines
22 KiB
Python
484 lines
22 KiB
Python
import datetime
|
||
import json
|
||
import statistics
|
||
import time
|
||
from pathlib import Path
|
||
|
||
import httpx
|
||
from django.conf import settings
|
||
from django.core.management.base import BaseCommand
|
||
from django.db.models import Q
|
||
from pgvector.django import CosineDistance
|
||
|
||
from apps.accounts.models import Organization, Role, User
|
||
from apps.knowledge.models import KnowledgeChunk, TrainingFile
|
||
from apps.onboarding.models import OnboardingSession
|
||
|
||
|
||
class Command(BaseCommand):
|
||
help = "Benchmark Dynavera system components: GPU inference server, pgvector retrieval, and database."
|
||
|
||
def add_arguments(self, parser):
|
||
parser.add_argument("--runs", type=int, default=5, help="Repetitions per latency benchmark (default: 5)")
|
||
parser.add_argument("--out", type=str, default="benchmarks", help="Output directory for the results file (default: benchmarks/)")
|
||
parser.add_argument("--skip-llm", action="store_true", help="Skip LLM inference benchmarks (each prompt takes ~30 s)")
|
||
|
||
def handle(self, *args, **options):
|
||
self.runs = options["runs"]
|
||
self.skip_llm = options["skip_llm"]
|
||
self.out_dir = Path(options["out"])
|
||
self.out_dir.mkdir(exist_ok=True)
|
||
self.results = {}
|
||
|
||
self.stdout.write(self.style.SUCCESS("\n=== Dynavera System Benchmark ==="))
|
||
self.stdout.write(f" Inference endpoint : {settings.INFERENCE_URL}")
|
||
self.stdout.write(f" Repetitions : {self.runs}")
|
||
self.stdout.write(f" LLM benchmarks : {'SKIPPED (--skip-llm)' if self.skip_llm else 'ENABLED'}\n")
|
||
|
||
self._bench_health()
|
||
self._bench_embeddings()
|
||
self._bench_chunking()
|
||
if not self.skip_llm:
|
||
self._bench_llm()
|
||
self._bench_database()
|
||
self._bench_retrieval()
|
||
self._print_summary()
|
||
self._save_report()
|
||
|
||
def _req(self, method, path, **kwargs):
|
||
url = f"{settings.INFERENCE_URL}{path}"
|
||
resp = httpx.request(method, url, auth=settings.INFERENCE_AUTH, timeout=180, **kwargs)
|
||
resp.raise_for_status()
|
||
return resp.json()
|
||
|
||
def _time_fn(self, fn):
|
||
t0 = time.perf_counter()
|
||
result = fn()
|
||
return result, (time.perf_counter() - t0) * 1000
|
||
|
||
def _stats(self, times_ms):
|
||
s = sorted(times_ms)
|
||
n = len(s)
|
||
p95_idx = min(n - 1, int(-(-(0.95 * n) // 1)) - 1)
|
||
return {
|
||
"mean_ms": round(statistics.mean(s), 1),
|
||
"median_ms": round(statistics.median(s), 1),
|
||
"p95_ms": round(s[p95_idx], 1),
|
||
"min_ms": round(s[0], 1),
|
||
"max_ms": round(s[-1], 1),
|
||
}
|
||
|
||
def _bench_health(self):
|
||
self.stdout.write("[ 1/6 ] GPU server health check ...")
|
||
try:
|
||
data, ms = self._time_fn(lambda: self._req("GET", "/health"))
|
||
ok = data.get("status") == "ok"
|
||
self.results["health"] = {
|
||
"status": "OK" if ok else "DEGRADED",
|
||
"llm_ready": data.get("llm_ready", False),
|
||
"embed_ready": data.get("embedding_ready", False),
|
||
"latency_ms": round(ms, 1),
|
||
}
|
||
h = self.results["health"]
|
||
self.stdout.write(
|
||
f" {h['status']} | LLM: {'ready' if h['llm_ready'] else 'unloaded'} "
|
||
f"| Embed: {'ready' if h['embed_ready'] else 'not ready'} | {ms:.0f} ms"
|
||
)
|
||
except Exception as exc:
|
||
self.results["health"] = {"status": "ERROR", "error": str(exc)}
|
||
self.stdout.write(self.style.ERROR(f" FAILED: {exc}"))
|
||
|
||
def _bench_embeddings(self):
|
||
self.stdout.write(f"\n[ 2/6 ] Embedding latency ({self.runs} runs × 3 query lengths) ...")
|
||
queries = {
|
||
"short ": "What is onboarding?",
|
||
"medium ": (
|
||
"Explain the process for configuring access control policies for a new software engineer "
|
||
"joining the platform team, including approval workflows and tool provisioning steps."
|
||
),
|
||
"long ": (
|
||
"A new hire on the infrastructure team needs to understand our CI/CD pipeline, deployment "
|
||
"procedures, incident response protocols, monitoring dashboards, on-call rotation policy, "
|
||
"and how to request access to production systems. Provide a comprehensive overview of all "
|
||
"these areas including the relevant tools, key contacts, and escalation procedures they "
|
||
"should be aware of during their first week and first month at the company."
|
||
),
|
||
}
|
||
embed_results = {}
|
||
for label, query in queries.items():
|
||
times = []
|
||
for _ in range(self.runs):
|
||
_, ms = self._time_fn(lambda q=query: self._req("POST", "/v1/embeddings", json={"input": q}))
|
||
times.append(ms)
|
||
st = self._stats(times)
|
||
embed_results[label.strip()] = {"query_chars": len(query), **st}
|
||
self.stdout.write(
|
||
f" {label}({len(query):4d} chars) mean={st['mean_ms']:.0f} ms "
|
||
f"p95={st['p95_ms']:.0f} ms min={st['min_ms']:.0f} ms max={st['max_ms']:.0f} ms"
|
||
)
|
||
self.results["embeddings"] = embed_results
|
||
|
||
def _bench_chunking(self):
|
||
self.stdout.write("\n[ 3/6 ] Semantic chunking latency ...")
|
||
texts = {
|
||
"small (~200 c)": "a " * 100,
|
||
"medium (~2k c) ": (
|
||
"This section covers the onboarding process for new employees joining the engineering team. "
|
||
"You will learn about code review practices, deployment procedures, incident response, and "
|
||
"team communication protocols. Each topic is covered in depth with examples and references "
|
||
"to internal documentation. All engineers are expected to complete this module in week one. "
|
||
) * 5,
|
||
"large (~8k c) ": (
|
||
"The infrastructure team manages all cloud resources, CI/CD pipelines, and production environments. "
|
||
"New members are expected to understand Kubernetes cluster management, Terraform IaC, "
|
||
"GitLab CI pipeline authoring, monitoring with Grafana and Prometheus, and incident response procedures. "
|
||
"This document provides a comprehensive guide to each area including runbooks and escalation paths. "
|
||
) * 20,
|
||
}
|
||
chunk_results = {}
|
||
for label, text in texts.items():
|
||
try:
|
||
result, ms = self._time_fn(lambda t=text: self._req("POST", "/v1/semantic-chunk", json={"text": t}))
|
||
n = len(result.get("chunks", []))
|
||
chunk_results[label.strip()] = {"chars": len(text), "chunks_produced": n, "latency_ms": round(ms, 1)}
|
||
self.stdout.write(f" {label} → {n} chunks | {ms:.0f} ms")
|
||
except Exception as exc:
|
||
chunk_results[label.strip()] = {"error": str(exc)}
|
||
self.stdout.write(self.style.ERROR(f" {label} FAILED: {exc}"))
|
||
self.results["chunking"] = chunk_results
|
||
|
||
def _bench_llm(self):
|
||
self.stdout.write("\n[ 4/6 ] LLM inference latency (each prompt is a single non-streaming call) ...")
|
||
prompts = [
|
||
{
|
||
"label": "short_qa",
|
||
"system": "You are an onboarding assistant.",
|
||
"user": "What does a Kubernetes pod do? Answer in 2 sentences.",
|
||
"max_tokens": 128,
|
||
},
|
||
{
|
||
"label": "progress_summary",
|
||
"system": "You are an onboarding assistant.",
|
||
"user": (
|
||
"A trainee has completed: Git Basics, CI/CD Pipelines, Code Review. Score: 85%. "
|
||
"Write a 2-sentence progress summary."
|
||
),
|
||
"max_tokens": 128,
|
||
},
|
||
{
|
||
"label": "curriculum_gen",
|
||
"system": "You are an onboarding assistant. Output only a valid JSON array of strings.",
|
||
"user": (
|
||
"Create a 6-module onboarding curriculum for a Software Engineer role focused on "
|
||
"backend services. Output ONLY a JSON array of module title strings."
|
||
),
|
||
"max_tokens": 256,
|
||
},
|
||
{
|
||
"label": "assessment_gen",
|
||
"system": "You are an onboarding assistant. Output only valid JSON.",
|
||
"user": (
|
||
"Generate 3 multiple-choice questions to assess understanding of CI/CD pipelines. "
|
||
"Output as a JSON array of objects with keys: question, options (array of 4), answer."
|
||
),
|
||
"max_tokens": 512,
|
||
},
|
||
{
|
||
"label": "knowledge_explanation",
|
||
"system": "You are an onboarding assistant.",
|
||
"user": (
|
||
"Explain Git branching strategy best practices for a new engineer. "
|
||
"Cover: feature branches, naming conventions, merge vs rebase, and PR workflow. "
|
||
"Use clear headings and bullet points. Target ~400 words."
|
||
),
|
||
"max_tokens": 700,
|
||
},
|
||
]
|
||
llm_results = {}
|
||
for p in prompts:
|
||
self.stdout.write(f" {p['label']} (max_tokens={p['max_tokens']}) ...", ending="")
|
||
self.stdout.flush()
|
||
try:
|
||
t0 = time.perf_counter()
|
||
data = self._req(
|
||
"POST",
|
||
"/v1/chat/completions",
|
||
json={
|
||
"messages": [
|
||
{"role": "system", "content": p["system"]},
|
||
{"role": "user", "content": p["user"]},
|
||
],
|
||
"max_tokens": p["max_tokens"],
|
||
"stream": False,
|
||
},
|
||
)
|
||
elapsed_s = time.perf_counter() - t0
|
||
usage = data.get("usage", {})
|
||
ct = usage.get("completion_tokens", 0)
|
||
pt = usage.get("prompt_tokens", 0)
|
||
tps = round(ct / elapsed_s, 1) if elapsed_s > 0 and ct > 0 else 0
|
||
preview = (data["choices"][0]["message"]["content"] or "")[:100].replace("\n", " ")
|
||
llm_results[p["label"]] = {
|
||
"elapsed_s": round(elapsed_s, 2),
|
||
"prompt_tokens": pt,
|
||
"completion_tokens": ct,
|
||
"tokens_per_sec": tps,
|
||
"response_preview": preview,
|
||
}
|
||
self.stdout.write(f" {elapsed_s:.1f} s | {ct} tokens | {tps} tok/s")
|
||
except Exception as exc:
|
||
llm_results[p["label"]] = {"error": str(exc)}
|
||
self.stdout.write(self.style.ERROR(f" FAILED: {exc}"))
|
||
self.results["llm"] = llm_results
|
||
|
||
def _bench_database(self):
|
||
self.stdout.write("\n[ 5/6 ] Database statistics ...")
|
||
try:
|
||
from django.db import connection
|
||
with connection.cursor() as cur:
|
||
cur.execute("SELECT 1 FROM knowledge_knowledgechunk LIMIT 1")
|
||
except Exception:
|
||
self.stdout.write(self.style.WARNING(" Tables missing — run 'manage.py migrate' first. Skipping."))
|
||
self.results["database"] = {"skipped": "Migrations not applied."}
|
||
return
|
||
try:
|
||
self.results["database"] = {
|
||
"organizations": Organization.objects.count(),
|
||
"roles": Role.objects.count(),
|
||
"users": User.objects.count(),
|
||
"training_files_total": TrainingFile.objects.count(),
|
||
"training_files_embedded": TrainingFile.objects.filter(status="embedded").count(),
|
||
"knowledge_chunks_with_embeddings": KnowledgeChunk.objects.filter(embedding__isnull=False, is_active=True).count(),
|
||
"onboarding_sessions": OnboardingSession.objects.count(),
|
||
}
|
||
d = self.results["database"]
|
||
self.stdout.write(f" Orgs: {d['organizations']} | Roles: {d['roles']} | Users: {d['users']}")
|
||
self.stdout.write(f" Training files: {d['training_files_total']} total ({d['training_files_embedded']} embedded)")
|
||
self.stdout.write(f" Knowledge chunks (with embeddings): {d['knowledge_chunks_with_embeddings']}")
|
||
self.stdout.write(f" Onboarding sessions: {d['onboarding_sessions']}")
|
||
except Exception as exc:
|
||
self.results["database"] = {"error": str(exc)}
|
||
self.stdout.write(self.style.ERROR(f" FAILED: {exc}"))
|
||
|
||
def _bench_retrieval(self):
|
||
self.stdout.write(f"\n[ 6/6 ] pgvector retrieval latency ({self.runs} runs × top-k ∈ [5, 10, 20]) ...")
|
||
try:
|
||
role = Role.objects.filter(knowledge_chunks__embedding__isnull=False).distinct().first()
|
||
except Exception as exc:
|
||
self.stdout.write(self.style.WARNING(f" DB not ready ({exc}). Skipping."))
|
||
self.results["retrieval"] = {"skipped": str(exc)}
|
||
return
|
||
if role is None:
|
||
self.stdout.write(self.style.WARNING(" No role with embedded chunks — skipping."))
|
||
self.results["retrieval"] = {"skipped": "No embedded chunks found in database."}
|
||
return
|
||
|
||
query = "What are the key responsibilities, tools, and procedures for this role?"
|
||
self.stdout.write(f" Role: {role.name} (org: {role.organization.name})")
|
||
self.stdout.write(f" Query: \"{query}\"")
|
||
|
||
try:
|
||
embed_data = self._req("POST", "/v1/embeddings", json={"input": query})
|
||
query_vector = embed_data["data"][0]["embedding"]
|
||
except Exception as exc:
|
||
self.results["retrieval"] = {"error": f"Could not generate query embedding: {exc}"}
|
||
self.stdout.write(self.style.ERROR(f" FAILED to get embedding: {exc}"))
|
||
return
|
||
|
||
total_chunks = KnowledgeChunk.objects.filter(embedding__isnull=False, is_active=True).count()
|
||
retrieval_results = {}
|
||
for top_k in [5, 10, 20]:
|
||
times = []
|
||
n_returned = 0
|
||
for _ in range(self.runs):
|
||
t0 = time.perf_counter()
|
||
chunks = list(
|
||
KnowledgeChunk.objects.filter(
|
||
organization=role.organization,
|
||
embedding__isnull=False,
|
||
is_active=True,
|
||
).filter(
|
||
Q(role=role) | Q(role__isnull=True)
|
||
).annotate(
|
||
distance=CosineDistance("embedding", query_vector)
|
||
).order_by("distance")[:top_k]
|
||
)
|
||
times.append((time.perf_counter() - t0) * 1000)
|
||
n_returned = len(chunks)
|
||
st = self._stats(times)
|
||
retrieval_results[f"top_{top_k}"] = {"results_returned": n_returned, **st}
|
||
self.stdout.write(
|
||
f" top-{top_k:2d}: mean={st['mean_ms']:.1f} ms "
|
||
f"p95={st['p95_ms']:.1f} ms min={st['min_ms']:.1f} ms max={st['max_ms']:.1f} ms"
|
||
)
|
||
self.results["retrieval"] = {
|
||
"role": role.name,
|
||
"organization": role.organization.name,
|
||
"query": query,
|
||
"total_chunks_in_db": total_chunks,
|
||
"results": retrieval_results,
|
||
}
|
||
|
||
def _print_summary(self):
|
||
self.stdout.write(self.style.SUCCESS("\n=== Summary ===\n"))
|
||
h = self.results.get("health", {})
|
||
self.stdout.write(f" GPU Server : {h.get('status', 'N/A')} — LLM {'ready' if h.get('llm_ready') else 'unloaded'}, embed {'ready' if h.get('embed_ready') else 'N/A'}")
|
||
|
||
emb = self.results.get("embeddings", {})
|
||
means = [v["mean_ms"] for v in emb.values() if "mean_ms" in v]
|
||
if means:
|
||
self.stdout.write(f" Embedding : {min(means):.0f}–{max(means):.0f} ms (mean across query lengths)")
|
||
|
||
chnk = self.results.get("chunking", {})
|
||
lats = [v["latency_ms"] for v in chnk.values() if "latency_ms" in v]
|
||
if lats:
|
||
self.stdout.write(f" Chunking : {min(lats):.0f}–{max(lats):.0f} ms range by text size")
|
||
|
||
llm = self.results.get("llm", {})
|
||
elapsed = [v["elapsed_s"] for v in llm.values() if "elapsed_s" in v]
|
||
tps_all = [v["tokens_per_sec"] for v in llm.values() if "tokens_per_sec" in v and v["tokens_per_sec"] > 0]
|
||
if elapsed:
|
||
self.stdout.write(
|
||
f" LLM inference : {min(elapsed):.1f}–{max(elapsed):.1f} s range"
|
||
+ (f" | {statistics.mean(tps_all):.1f} tok/s avg" if tps_all else "")
|
||
)
|
||
|
||
ret = self.results.get("retrieval", {})
|
||
r5 = ret.get("results", {}).get("top_5", {})
|
||
if r5.get("mean_ms"):
|
||
self.stdout.write(f" RAG retrieval : {r5['mean_ms']:.1f} ms mean (top-5, {ret.get('total_chunks_in_db', '?')} total chunks)")
|
||
|
||
db = self.results.get("database", {})
|
||
if "knowledge_chunks_with_embeddings" in db:
|
||
self.stdout.write(
|
||
f" Knowledge base : {db['knowledge_chunks_with_embeddings']} chunks from "
|
||
f"{db['training_files_embedded']} embedded files"
|
||
)
|
||
|
||
def _save_report(self):
|
||
ts = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
|
||
path = self.out_dir / f"results_{ts}.md"
|
||
|
||
lines = [
|
||
"# Dynavera Benchmark Results",
|
||
"",
|
||
f"**Date:** {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ",
|
||
f"**Inference endpoint:** `{settings.INFERENCE_URL}` ",
|
||
f"**Repetitions per benchmark:** {self.runs} ",
|
||
"",
|
||
]
|
||
|
||
h = self.results.get("health", {})
|
||
lines += [
|
||
"## 1. GPU Server Health",
|
||
"",
|
||
"| Field | Value |",
|
||
"|---|---|",
|
||
f"| Status | {h.get('status', 'N/A')} |",
|
||
f"| LLM Ready | {h.get('llm_ready', 'N/A')} |",
|
||
f"| Embed Ready | {h.get('embed_ready', 'N/A')} |",
|
||
f"| Health check RTT | {h.get('latency_ms', 'N/A')} ms |",
|
||
"",
|
||
]
|
||
|
||
emb = self.results.get("embeddings", {})
|
||
if emb:
|
||
lines += [
|
||
"## 2. Embedding Latency",
|
||
"",
|
||
"| Query type | Chars | Mean (ms) | Median (ms) | P95 (ms) | Min (ms) | Max (ms) |",
|
||
"|---|---|---|---|---|---|---|",
|
||
]
|
||
for label, v in emb.items():
|
||
if "mean_ms" in v:
|
||
lines.append(f"| {label} | {v['query_chars']} | {v['mean_ms']} | {v['median_ms']} | {v['p95_ms']} | {v['min_ms']} | {v['max_ms']} |")
|
||
lines.append("")
|
||
|
||
chnk = self.results.get("chunking", {})
|
||
if chnk:
|
||
lines += [
|
||
"## 3. Semantic Chunking Latency",
|
||
"",
|
||
"| Input size | Chars | Chunks produced | Latency (ms) |",
|
||
"|---|---|---|---|",
|
||
]
|
||
for label, v in chnk.items():
|
||
if "latency_ms" in v:
|
||
lines.append(f"| {label} | {v['chars']} | {v['chunks_produced']} | {v['latency_ms']} |")
|
||
lines.append("")
|
||
|
||
llm = self.results.get("llm", {})
|
||
if llm:
|
||
lines += [
|
||
"## 4. LLM Inference Latency",
|
||
"",
|
||
"| Prompt type | Elapsed (s) | Prompt tokens | Completion tokens | Tok/s |",
|
||
"|---|---|---|---|---|",
|
||
]
|
||
for label, v in llm.items():
|
||
if "elapsed_s" in v:
|
||
lines.append(
|
||
f"| {label} | {v['elapsed_s']} | {v['prompt_tokens']} | {v['completion_tokens']} | {v['tokens_per_sec']} |"
|
||
)
|
||
else:
|
||
lines.append(f"| {label} | ERROR | — | — | — |")
|
||
lines.append("")
|
||
lines += [
|
||
"> **Note on end-to-end session time:** A full onboarding session invokes multiple sequential",
|
||
"> inference calls (curriculum generation → knowledge explanation × N modules → assessment generation → progress summary).",
|
||
"> Total wall-clock time accumulates across all turns plus retrieval and tool-call overhead.",
|
||
"",
|
||
]
|
||
|
||
db = self.results.get("database", {})
|
||
if db and "error" not in db:
|
||
lines += [
|
||
"## 5. Database Statistics",
|
||
"",
|
||
"| Entity | Count |",
|
||
"|---|---|",
|
||
]
|
||
labels = {
|
||
"organizations": "Organizations",
|
||
"roles": "Roles",
|
||
"users": "Users",
|
||
"training_files_total": "Training Files (total)",
|
||
"training_files_embedded": "Training Files (embedded)",
|
||
"knowledge_chunks_with_embeddings": "Knowledge Chunks (with embeddings)",
|
||
"onboarding_sessions": "Onboarding Sessions",
|
||
}
|
||
for key, label in labels.items():
|
||
if key in db:
|
||
lines.append(f"| {label} | {db[key]} |")
|
||
lines.append("")
|
||
|
||
ret = self.results.get("retrieval", {})
|
||
if "results" in ret:
|
||
lines += [
|
||
"## 6. pgvector Retrieval Latency",
|
||
"",
|
||
f"**Role:** {ret.get('role')} ",
|
||
f"**Organisation:** {ret.get('organization')} ",
|
||
f'**Query:** "{ret.get("query")}" ',
|
||
f"**Total chunks in DB:** {ret.get('total_chunks_in_db')} ",
|
||
"",
|
||
"| Top-K | Results returned | Mean (ms) | Median (ms) | P95 (ms) | Min (ms) | Max (ms) |",
|
||
"|---|---|---|---|---|---|---|",
|
||
]
|
||
for k, v in ret["results"].items():
|
||
lines.append(
|
||
f"| {k} | {v['results_returned']} | {v['mean_ms']} | {v['median_ms']} | {v['p95_ms']} | {v['min_ms']} | {v['max_ms']} |"
|
||
)
|
||
lines.append("")
|
||
|
||
lines += [
|
||
"## Raw JSON",
|
||
"",
|
||
"```json",
|
||
json.dumps(self.results, indent=2, default=str),
|
||
"```",
|
||
"",
|
||
]
|
||
|
||
path.write_text("\n".join(lines), encoding="utf-8")
|
||
self.stdout.write(self.style.SUCCESS(f"\nResults saved → {path}"))
|