200 lines
7.7 KiB
Python
200 lines
7.7 KiB
Python
import hashlib
|
|
import logging
|
|
|
|
from celery import shared_task
|
|
from django.conf import settings
|
|
from django.db import transaction
|
|
from docx import Document
|
|
from httpx import Client, Timeout
|
|
from pypdf import PdfReader
|
|
|
|
from apps.knowledge.models import KnowledgeChunk, TrainingFile
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _decode_text_bytes(raw_bytes: bytes) -> str:
|
|
try:
|
|
return raw_bytes.decode('utf-8')
|
|
except UnicodeDecodeError:
|
|
return raw_bytes.decode('latin-1', errors='ignore')
|
|
|
|
def _extract_text_from_training_file(file_obj: TrainingFile) -> str:
|
|
file_name = (file_obj.file_name or '').lower()
|
|
|
|
if file_name.endswith('.pdf'):
|
|
with file_obj.file.open('rb') as f:
|
|
reader = PdfReader(f)
|
|
pages = [page.extract_text() or '' for page in reader.pages]
|
|
return '\n'.join(pages).strip()
|
|
|
|
if file_name.endswith('.docx'):
|
|
with file_obj.file.open('rb') as f:
|
|
document = Document(f)
|
|
paragraphs = [paragraph.text for paragraph in document.paragraphs if paragraph.text]
|
|
return '\n'.join(paragraphs).strip()
|
|
|
|
with file_obj.file.open('rb') as f:
|
|
raw_bytes = f.read()
|
|
return _decode_text_bytes(raw_bytes).strip()
|
|
|
|
def _get_text_chunks(text: str, size: int = 10000):
|
|
for i in range(0, len(text), size):
|
|
yield text[i:i + size]
|
|
|
|
@shared_task(name="apps.knowledge.tasks.ingest_training_file_task", bind=True, soft_time_limit=900, time_limit=1200)
|
|
def ingest_training_file_task(self, file_uuid):
|
|
"""
|
|
Ingests a training file by extracting text, chunking it, generating embeddings via an external service,
|
|
and saving KnowledgeChunk entries. Updates the file status accordingly and triggers prompt refinement.
|
|
"""
|
|
try:
|
|
file_obj = TrainingFile.objects.get(uuid=file_uuid)
|
|
except TrainingFile.DoesNotExist:
|
|
return f"File {file_uuid} not found."
|
|
|
|
file_obj.status = 'ingesting'
|
|
file_obj.save()
|
|
|
|
try:
|
|
raw_text = _extract_text_from_training_file(file_obj)
|
|
if not raw_text:
|
|
raise ValueError('No extractable text found.')
|
|
|
|
all_documents = []
|
|
chunk_counter = 0
|
|
|
|
timeout = Timeout(60.0)
|
|
|
|
with Client(timeout=timeout) as client:
|
|
|
|
for text_segment in _get_text_chunks(raw_text):
|
|
response = client.post(
|
|
settings.INFERENCE_SEMANTIC_CHUNK_ENDPOINT,
|
|
json={
|
|
"text": text_segment,
|
|
"threshold": 95,
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
result = response.json()
|
|
|
|
chunks = result['chunks']
|
|
embeddings = result['embeddings']
|
|
|
|
for chunk_text, embedding in zip(chunks, embeddings):
|
|
all_documents.append(KnowledgeChunk(
|
|
organization=file_obj.organization,
|
|
role=file_obj.role,
|
|
training_file=file_obj,
|
|
content=chunk_text,
|
|
content_hash=hashlib.sha256(chunk_text.encode('utf-8')).hexdigest(),
|
|
embedding=embedding,
|
|
chunk_index=chunk_counter,
|
|
metadata={
|
|
"source": file_obj.file_name,
|
|
"file_name": file_obj.file_name,
|
|
"scope": "role" if file_obj.role_id else "organization",
|
|
},
|
|
))
|
|
chunk_counter += 1
|
|
|
|
existing_hashes = set(KnowledgeChunk.objects.filter(training_file=file_obj).values_list('content_hash', flat=True))
|
|
new_documents = [d for d in all_documents if d.content_hash not in existing_hashes]
|
|
|
|
with transaction.atomic():
|
|
KnowledgeChunk.objects.bulk_create(new_documents)
|
|
|
|
file_obj.status = 'embedded'
|
|
file_obj.save()
|
|
|
|
if file_obj.role_id:
|
|
update_agent_prompts_from_file_task.delay(str(file_obj.role.uuid))
|
|
|
|
return f"Processed {chunk_counter} chunks via batching."
|
|
|
|
except Exception as e:
|
|
file_obj.status = 'failed'
|
|
file_obj.description = str(e)
|
|
file_obj.save()
|
|
raise e
|
|
|
|
|
|
@shared_task(name="apps.knowledge.tasks.update_agent_prompts_from_file_task", bind=True, soft_time_limit=120, time_limit=180)
|
|
def update_agent_prompts_from_file_task(self, role_uuid: str):
|
|
"""
|
|
After a training file is ingested (or deleted), refine the curriculum and assessment
|
|
AgentConfig system prompts using document content. Resets to canonical base prompts
|
|
when no files remain.
|
|
"""
|
|
from apps.accounts.models import Role
|
|
from apps.onboarding.consumers.prompts import OnboardingPrompts
|
|
from apps.onboarding.models import AgentConfig
|
|
|
|
try:
|
|
role = Role.objects.get(uuid=role_uuid)
|
|
except Role.DoesNotExist:
|
|
logger.warning("update_agent_prompts_from_file_task: role %s not found", role_uuid)
|
|
return
|
|
|
|
configs = {
|
|
cfg.agent_type: cfg
|
|
for cfg in AgentConfig.objects.filter(role=role, agent_type__in=['curriculum', 'assessment'])
|
|
}
|
|
|
|
chunk_texts = list(
|
|
KnowledgeChunk.objects.filter(role=role, is_active=True)
|
|
.order_by('training_file_id', 'chunk_index')
|
|
.values_list('content', flat=True)[:30]
|
|
)
|
|
|
|
# No files left — reset both to their canonical base prompts
|
|
if not chunk_texts:
|
|
to_update = []
|
|
if 'curriculum' in configs:
|
|
configs['curriculum'].system_prompt = OnboardingPrompts.default_curriculum_prompt(role.name)
|
|
to_update.append(configs['curriculum'])
|
|
if 'assessment' in configs:
|
|
configs['assessment'].system_prompt = OnboardingPrompts.default_assessment_prompt(role.name)
|
|
to_update.append(configs['assessment'])
|
|
for cfg in to_update:
|
|
cfg.save(update_fields=['system_prompt', 'updated_at'])
|
|
logger.info("update_agent_prompts_from_file_task: reset to base prompts for role %s", role_uuid)
|
|
return
|
|
|
|
combined_text = '\n\n'.join(chunk_texts)[:6000]
|
|
|
|
refine_calls = [
|
|
(
|
|
'curriculum',
|
|
OnboardingPrompts.refine_curriculum_prompt(
|
|
role.name, OnboardingPrompts.default_curriculum_prompt(role.name), combined_text
|
|
),
|
|
),
|
|
(
|
|
'assessment',
|
|
OnboardingPrompts.refine_assessment_prompt(
|
|
role.name, OnboardingPrompts.default_assessment_prompt(role.name), combined_text
|
|
),
|
|
),
|
|
]
|
|
|
|
try:
|
|
with Client(timeout=Timeout(60.0)) as client:
|
|
for agent_type, user_prompt in refine_calls:
|
|
if agent_type not in configs:
|
|
continue
|
|
response = client.post(
|
|
settings.INFERENCE_CHAT_COMPLETIONS_ENDPOINT,
|
|
json={
|
|
"model": "meta-llama-3.1-8b-instruct",
|
|
"messages": [{"role": "user", "content": user_prompt}],
|
|
"max_tokens": 600,
|
|
},
|
|
)
|
|
response.raise_for_status()
|
|
configs[agent_type].system_prompt = response.json()["choices"][0]["message"]["content"].strip()
|
|
configs[agent_type].save(update_fields=['system_prompt', 'updated_at'])
|
|
logger.info("update_agent_prompts_from_file_task: refined %s prompt for role %s", agent_type, role_uuid)
|
|
except Exception as e:
|
|
logger.exception("update_agent_prompts_from_file_task: LLM call failed for role %s: %s", role_uuid, e)
|