from celery import shared_task from django.utils import timezone from channels.layers import get_channel_layer from asgiref.sync import async_to_sync from apps.agents.models import Agent, AgentExecution, AgentEvent import json from django.conf import settings import asyncio @shared_task def start_agent_task_mcp(execution_id): print(f"[start_agent_task_mcp] invoked with execution_id={execution_id}") try: execution = AgentExecution.objects.get(uuid=execution_id) print(f"[start_agent_task_mcp] execution record loaded: agent={execution.agent.uuid}") execution.status = 'running' execution.started_at = timezone.now() execution.save() channel_layer = get_channel_layer() room_group_name = f"agent_{execution.agent.uuid}" try: async_to_sync(channel_layer.group_send)( room_group_name, { "type": "agent_event", "event_type": "started", "content": { "execution_id": str(execution.uuid), "agent_id": str(execution.agent.uuid), "message": "Agent execution started (via MCP)" }, "timestamp": timezone.now().isoformat() } ) except Exception as channel_error: print(f"Channel layer error: {channel_error}") AgentEvent.objects.create( execution=execution, event_type='started', content={"execution_id": str(execution.uuid), "method": "mcp"} ) from mcp_agent.mcp_client import MCPAgentClient async def execute_remote(): async with MCPAgentClient() as client: return await client.execute_agent( agent_id=str(execution.agent.uuid), agent_name=execution.agent.name, execution_id=str(execution.uuid), query=execution.input_data.get("query", ""), input_data=execution.input_data ) result = asyncio.run(execute_remote()) print(f"[start_agent_task_mcp] MCP result: {result.get('status')}") if result.get('events'): for event in result['events']: try: async_to_sync(channel_layer.group_send)( room_group_name, { "type": "agent_event", "event_type": event.get('type', 'message'), "content": event, "timestamp": event.get('timestamp', timezone.now().isoformat()) } ) except Exception as e: print(f"Error forwarding event: {e}") if result.get('status') == 'completed': execution.status = 'completed' execution.output_data = result elif result.get('status') in ['failed', 'error']: execution.status = 'failed' execution.error_message = result.get('error', 'Unknown error') execution.output_data = result else: execution.status = 'completed' execution.output_data = result execution.completed_at = timezone.now() execution.save() try: async_to_sync(channel_layer.group_send)( room_group_name, { "type": "agent_completed", "execution_id": str(execution.uuid), "output_data": result } ) except Exception as channel_error: print(f"Channel layer error: {channel_error}") AgentEvent.objects.create( execution=execution, event_type='completed', content={"execution_id": str(execution.uuid), "output": result} ) except AgentExecution.DoesNotExist: print(f"Execution {execution_id} not found") except Exception as e: print(f"[start_agent_task_mcp] exception: {e}") import traceback traceback.print_exc() try: execution = AgentExecution.objects.get(uuid=execution_id) execution.status = 'failed' execution.error_message = str(e) execution.completed_at = timezone.now() execution.save() channel_layer = get_channel_layer() room_group_name = f"agent_{execution.agent.uuid}" async_to_sync(channel_layer.group_send)( room_group_name, { "type": "agent_error", "execution_id": str(execution.uuid), "error_message": str(e) } ) except: pass