Dynavera/apps/agents/tasks.py
2025-12-17 14:11:23 +00:00

135 lines
3.7 KiB
Python

from celery import shared_task
from django.utils import timezone
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
from apps.agents.models import Agent, AgentExecution, AgentEvent
import json
from django.conf import settings
import asyncio
@shared_task
def start_agent_task_mcp(execution_id):
print(f"[start_agent_task_mcp] invoked with execution_id={execution_id}")
try:
execution = AgentExecution.objects.get(uuid=execution_id)
print(f"[start_agent_task_mcp] execution record loaded: agent={execution.agent.uuid}")
execution.status = 'running'
execution.started_at = timezone.now()
execution.save()
channel_layer = get_channel_layer()
room_group_name = f"agent_{execution.agent.uuid}"
try:
async_to_sync(channel_layer.group_send)(
room_group_name,
{
"type": "agent_event",
"event_type": "started",
"content": {
"execution_id": str(execution.uuid),
"agent_id": str(execution.agent.uuid),
"message": "Agent execution started (via MCP)"
},
"timestamp": timezone.now().isoformat()
}
)
except Exception as channel_error:
print(f"Channel layer error: {channel_error}")
AgentEvent.objects.create(
execution=execution,
event_type='started',
content={"execution_id": str(execution.uuid), "method": "mcp"}
)
from mcp_agent.mcp_client import MCPAgentClient
async def execute_remote():
async with MCPAgentClient() as client:
return await client.execute_agent(
agent_id=str(execution.agent.uuid),
agent_name=execution.agent.name,
execution_id=str(execution.uuid),
query=execution.input_data.get("query", ""),
input_data=execution.input_data
)
result = asyncio.run(execute_remote())
print(f"[start_agent_task_mcp] MCP result: {result.get('status')}")
if result.get('events'):
for event in result['events']:
try:
async_to_sync(channel_layer.group_send)(
room_group_name,
{
"type": "agent_event",
"event_type": event.get('type', 'message'),
"content": event,
"timestamp": event.get('timestamp', timezone.now().isoformat())
}
)
except Exception as e:
print(f"Error forwarding event: {e}")
if result.get('status') == 'completed':
execution.status = 'completed'
execution.output_data = result
elif result.get('status') in ['failed', 'error']:
execution.status = 'failed'
execution.error_message = result.get('error', 'Unknown error')
execution.output_data = result
else:
execution.status = 'completed'
execution.output_data = result
execution.completed_at = timezone.now()
execution.save()
try:
async_to_sync(channel_layer.group_send)(
room_group_name,
{
"type": "agent_completed",
"execution_id": str(execution.uuid),
"output_data": result
}
)
except Exception as channel_error:
print(f"Channel layer error: {channel_error}")
AgentEvent.objects.create(
execution=execution,
event_type='completed',
content={"execution_id": str(execution.uuid), "output": result}
)
except AgentExecution.DoesNotExist:
print(f"Execution {execution_id} not found")
except Exception as e:
print(f"[start_agent_task_mcp] exception: {e}")
import traceback
traceback.print_exc()
try:
execution = AgentExecution.objects.get(uuid=execution_id)
execution.status = 'failed'
execution.error_message = str(e)
execution.completed_at = timezone.now()
execution.save()
channel_layer = get_channel_layer()
room_group_name = f"agent_{execution.agent.uuid}"
async_to_sync(channel_layer.group_send)(
room_group_name,
{
"type": "agent_error",
"execution_id": str(execution.uuid),
"error_message": str(e)
}
)
except:
pass