import hashlib import logging from celery import shared_task from django.conf import settings from django.db import transaction from docx import Document from httpx import Client, Timeout from pypdf import PdfReader from apps.knowledge.models import RoleRagDocument, TrainingFile logger = logging.getLogger(__name__) def _decode_text_bytes(raw_bytes: bytes) -> str: try: return raw_bytes.decode('utf-8') except UnicodeDecodeError: return raw_bytes.decode('latin-1', errors='ignore') def _extract_text_from_training_file(file_obj: TrainingFile) -> str: file_name = (file_obj.file_name or '').lower() if file_name.endswith('.pdf'): with file_obj.file.open('rb') as f: reader = PdfReader(f) pages = [page.extract_text() or '' for page in reader.pages] return '\n'.join(pages).strip() if file_name.endswith('.docx'): with file_obj.file.open('rb') as f: document = Document(f) paragraphs = [paragraph.text for paragraph in document.paragraphs if paragraph.text] return '\n'.join(paragraphs).strip() with file_obj.file.open('rb') as f: raw_bytes = f.read() return _decode_text_bytes(raw_bytes).strip() def _get_text_chunks(text: str, size: int = 10000): for i in range(0, len(text), size): yield text[i:i + size] @shared_task(name="apps.knowledge.tasks.ingest_training_file_task", bind=True, soft_time_limit=900, time_limit=1200) def ingest_training_file_task(self, file_uuid): """ Ingests a training file by extracting text, chunking it, generating embeddings via an external service, and saving RoleRagDocument entries. Updates the file status accordingly and triggers prompt refinement. """ try: file_obj = TrainingFile.objects.get(uuid=file_uuid) except TrainingFile.DoesNotExist: return f"File {file_uuid} not found." file_obj.status = 'ingesting' file_obj.save() try: raw_text = _extract_text_from_training_file(file_obj) if not raw_text: raise ValueError('No extractable text found.') all_documents = [] chunk_counter = 0 timeout = Timeout(60.0) with Client(timeout=timeout) as client: for text_segment in _get_text_chunks(raw_text): response = client.post( settings.INFERENCE_SEMANTIC_CHUNK_ENDPOINT, json={ "text": text_segment, "threshold": 95, }, ) response.raise_for_status() result = response.json() chunks = result['chunks'] embeddings = result['embeddings'] for chunk_text, embedding in zip(chunks, embeddings): all_documents.append(RoleRagDocument( organization=file_obj.organization, role=file_obj.role, training_file=file_obj, content=chunk_text, content_hash=hashlib.sha256(chunk_text.encode('utf-8')).hexdigest(), embedding=embedding, chunk_index=chunk_counter, metadata={ "source": file_obj.file_name, "file_name": file_obj.file_name, "scope": "role" if file_obj.role_id else "organization", }, )) chunk_counter += 1 with transaction.atomic(): RoleRagDocument.objects.bulk_create(all_documents) file_obj.status = 'embedded' file_obj.is_processed = True file_obj.save() if file_obj.role_id: update_agent_prompts_from_file_task.delay(str(file_obj.role.uuid)) return f"Processed {chunk_counter} chunks via batching." except Exception as e: file_obj.status = 'failed' file_obj.description = str(e) file_obj.save() raise e @shared_task(name="apps.knowledge.tasks.update_agent_prompts_from_file_task", bind=True, soft_time_limit=120, time_limit=180) def update_agent_prompts_from_file_task(self, role_uuid: str): """ After a training file is ingested (or deleted), refine the curriculum AgentConfig system prompt using document content. Resets to the canonical base prompt when no files remain. """ from apps.accounts.models import Role from apps.onboarding.consumers.prompts import OnboardingPrompts from apps.onboarding.models import AgentConfig try: role = Role.objects.get(uuid=role_uuid) except Role.DoesNotExist: logger.warning("update_agent_prompts_from_file_task: role %s not found", role_uuid) return curriculum_config = AgentConfig.objects.filter(role=role, agent_type='curriculum').first() if not curriculum_config: logger.warning("update_agent_prompts_from_file_task: no curriculum config for role %s", role_uuid) return chunk_texts = list( RoleRagDocument.objects.filter(role=role, is_active=True) .order_by('training_file_id', 'chunk_index') .values_list('content', flat=True)[:30] ) # No files left... so we should reset if not chunk_texts: curriculum_config.system_prompt = OnboardingPrompts.default_curriculum_prompt(role.name) curriculum_config.save(update_fields=['system_prompt', 'updated_at']) logger.info("update_agent_prompts_from_file_task: reset to base prompt for role %s", role_uuid) return combined_text = '\n\n'.join(chunk_texts)[:6000] base_prompt = OnboardingPrompts.default_curriculum_prompt(role.name) try: with Client(timeout=Timeout(60.0)) as client: response = client.post( settings.INFERENCE_CHAT_COMPLETIONS_ENDPOINT, json={ "model": "meta-llama-3.1-8b-instruct", "messages": [ { "role": "user", "content": OnboardingPrompts.refine_curriculum_prompt( role.name, base_prompt, combined_text ), }, ], "max_tokens": 600, }, ) response.raise_for_status() refined_prompt = response.json()["choices"][0]["message"]["content"].strip() except Exception as e: logger.exception("update_agent_prompts_from_file_task: LLM call failed for role %s: %s", role_uuid, e) return curriculum_config.system_prompt = refined_prompt curriculum_config.save(update_fields=['system_prompt', 'updated_at']) logger.info("update_agent_prompts_from_file_task: refined curriculum prompt for role %s", role_uuid)