2026-02-26 01:32:04 +00:00
|
|
|
import hashlib
|
|
|
|
|
|
|
|
|
|
from celery import shared_task
|
|
|
|
|
from django.conf import settings
|
2026-03-08 13:10:49 +00:00
|
|
|
from django.db import transaction
|
|
|
|
|
from docx import Document
|
|
|
|
|
from httpx import Client, Timeout
|
|
|
|
|
from pypdf import PdfReader
|
2026-02-26 01:32:04 +00:00
|
|
|
|
2026-03-08 13:10:49 +00:00
|
|
|
from apps.knowledge.models import RoleRagDocument, TrainingFile
|
2026-02-26 01:32:04 +00:00
|
|
|
|
2026-03-08 13:16:26 +00:00
|
|
|
|
2026-02-26 01:32:04 +00:00
|
|
|
def _decode_text_bytes(raw_bytes: bytes) -> str:
|
|
|
|
|
try:
|
|
|
|
|
return raw_bytes.decode('utf-8')
|
|
|
|
|
except UnicodeDecodeError:
|
|
|
|
|
return raw_bytes.decode('latin-1', errors='ignore')
|
|
|
|
|
|
|
|
|
|
def _extract_text_from_training_file(file_obj: TrainingFile) -> str:
|
|
|
|
|
file_name = (file_obj.file_name or '').lower()
|
|
|
|
|
|
|
|
|
|
if file_name.endswith('.pdf'):
|
|
|
|
|
with file_obj.file.open('rb') as f:
|
|
|
|
|
reader = PdfReader(f)
|
|
|
|
|
pages = [page.extract_text() or '' for page in reader.pages]
|
|
|
|
|
return '\n'.join(pages).strip()
|
|
|
|
|
|
|
|
|
|
if file_name.endswith('.docx'):
|
|
|
|
|
with file_obj.file.open('rb') as f:
|
|
|
|
|
document = Document(f)
|
|
|
|
|
paragraphs = [paragraph.text for paragraph in document.paragraphs if paragraph.text]
|
|
|
|
|
return '\n'.join(paragraphs).strip()
|
|
|
|
|
|
|
|
|
|
with file_obj.file.open('rb') as f:
|
|
|
|
|
raw_bytes = f.read()
|
|
|
|
|
return _decode_text_bytes(raw_bytes).strip()
|
|
|
|
|
|
|
|
|
|
def _get_text_chunks(text: str, size: int = 10000):
|
|
|
|
|
"""Slices text into rough blocks to prevent HTTP timeouts."""
|
|
|
|
|
for i in range(0, len(text), size):
|
|
|
|
|
yield text[i:i + size]
|
|
|
|
|
|
|
|
|
|
@shared_task(name="apps.knowledge.tasks.ingest_training_file_task", bind=True, soft_time_limit=900, time_limit=1200)
|
|
|
|
|
def ingest_training_file_task(self, file_uuid):
|
|
|
|
|
try:
|
|
|
|
|
file_obj = TrainingFile.objects.get(uuid=file_uuid)
|
|
|
|
|
except TrainingFile.DoesNotExist:
|
|
|
|
|
return f"File {file_uuid} not found."
|
|
|
|
|
|
|
|
|
|
file_obj.status = 'ingesting'
|
|
|
|
|
file_obj.save()
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
raw_text = _extract_text_from_training_file(file_obj)
|
|
|
|
|
if not raw_text:
|
|
|
|
|
raise ValueError('No extractable text found.')
|
|
|
|
|
|
|
|
|
|
all_documents = []
|
|
|
|
|
chunk_counter = 0
|
2026-03-08 13:10:49 +00:00
|
|
|
|
|
|
|
|
timeout = Timeout(60.0)
|
2026-02-26 01:32:04 +00:00
|
|
|
|
2026-03-08 13:10:49 +00:00
|
|
|
with Client(timeout=timeout) as client:
|
2026-02-26 01:32:04 +00:00
|
|
|
|
|
|
|
|
for text_segment in _get_text_chunks(raw_text):
|
|
|
|
|
response = client.post(
|
2026-03-08 13:16:26 +00:00
|
|
|
settings.INFERENCE_SEMANTIC_CHUNK_ENDPOINT,
|
2026-02-26 01:32:04 +00:00
|
|
|
json={"text": text_segment, "threshold": 95}
|
|
|
|
|
)
|
|
|
|
|
response.raise_for_status()
|
|
|
|
|
result = response.json()
|
|
|
|
|
|
|
|
|
|
chunks = result['chunks']
|
|
|
|
|
embeddings = result['embeddings']
|
|
|
|
|
|
|
|
|
|
for chunk_text, embedding in zip(chunks, embeddings):
|
|
|
|
|
all_documents.append(RoleRagDocument(
|
|
|
|
|
role=file_obj.role,
|
|
|
|
|
training_file=file_obj,
|
|
|
|
|
content=chunk_text,
|
|
|
|
|
content_hash=hashlib.sha256(chunk_text.encode('utf-8')).hexdigest(),
|
|
|
|
|
embedding=embedding,
|
|
|
|
|
chunk_index=chunk_counter,
|
|
|
|
|
metadata={"source": file_obj.file_name}
|
|
|
|
|
))
|
|
|
|
|
chunk_counter += 1
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with transaction.atomic():
|
|
|
|
|
RoleRagDocument.objects.bulk_create(all_documents)
|
|
|
|
|
|
|
|
|
|
file_obj.status = 'embedded'
|
|
|
|
|
file_obj.is_processed = True
|
|
|
|
|
file_obj.save()
|
|
|
|
|
return f"Processed {chunk_counter} chunks via batching."
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
file_obj.status = 'failed'
|
|
|
|
|
file_obj.description = str(e)
|
|
|
|
|
file_obj.save()
|
|
|
|
|
raise e
|