diff --git a/apps/agents/admin.py b/apps/agents/admin.py new file mode 100644 index 0000000..9a1c411 --- /dev/null +++ b/apps/agents/admin.py @@ -0,0 +1,23 @@ +from django.contrib import admin +from apps.agents.models import Agent, AgentExecution, AgentEvent + + +@admin.register(Agent) +class AgentAdmin(admin.ModelAdmin): + list_display = ('name', 'user', 'status', 'created_at') + list_filter = ('status', 'created_at') + search_fields = ('name', 'description') + + +@admin.register(AgentExecution) +class AgentExecutionAdmin(admin.ModelAdmin): + list_display = ('agent', 'user', 'status', 'created_at') + list_filter = ('status', 'created_at') + search_fields = ('agent__name',) + + +@admin.register(AgentEvent) +class AgentEventAdmin(admin.ModelAdmin): + list_display = ('event_type', 'execution', 'timestamp') + list_filter = ('event_type', 'timestamp') + search_fields = ('execution__agent__name',) \ No newline at end of file diff --git a/apps/agents/consumers.py b/apps/agents/consumers.py new file mode 100644 index 0000000..4ce7cfa --- /dev/null +++ b/apps/agents/consumers.py @@ -0,0 +1,163 @@ +import json +from channels.generic.websocket import AsyncWebsocketConsumer +from channels.db import database_sync_to_async +from apps.agents.models import Agent, AgentExecution, AgentEvent +from apps.agents.tasks import start_agent_task_mcp + + +class AgentConsumer(AsyncWebsocketConsumer): + async def connect(self): + self.user = self.scope["user"] + self.agent_id = self.scope['url_route']['kwargs'].get('agent_id') + self.room_group_name = f"agent_{self.agent_id}" + + if not self.user.is_authenticated: + await self.close() + return + + await self.channel_layer.group_add(self.room_group_name, self.channel_name) + await self.accept() + await self.send(json.dumps({ + "type": "connection", + "message": "Connected to agent stream", + "agent_id": str(self.agent_id) + })) + + async def disconnect(self, close_code): + await self.channel_layer.group_discard(self.room_group_name, self.channel_name) + + async def receive(self, text_data): + try: + data = json.loads(text_data) + action = data.get('action') + + if action == 'start_agent': + await self.handle_start_agent(data) + elif action == 'stop_agent': + await self.handle_stop_agent(data) + else: + await self.send(json.dumps({ + "type": "error", + "message": f"Unknown action: {action}" + })) + except json.JSONDecodeError: + await self.send(json.dumps({ + "type": "error", + "message": "Invalid JSON" + })) + except Exception as e: + await self.send(json.dumps({ + "type": "error", + "message": str(e) + })) + + async def handle_start_agent(self, data): + input_data = data.get('input_data', {}) + + agent = await self.get_agent(self.agent_id, self.user) + if not agent: + await self.send(json.dumps({ + "type": "error", + "message": "Agent not found" + })) + return + + execution = await self.create_execution(agent, self.user, input_data) + + await self.send(json.dumps({ + "type": "execution_started", + "execution_id": str(execution.uuid), + "agent_id": str(agent.uuid), + "message": f"Agent execution {execution.uuid} queued" + })) + + try: + from apps.agents.tasks import start_agent_task_mcp + + print(f"[Consumer] Queuing MCP execution for {execution.uuid}") + start_agent_task_mcp.delay(str(execution.uuid)) + + except Exception as e: + print(f"Error queuing agent task: {e}") + await self.send(json.dumps({ + "type": "execution_error", + "execution_id": str(execution.uuid), + "error_message": str(e) + })) + + async def handle_stop_agent(self, data): + execution_id = data.get('execution_id') + execution = await self.get_execution(execution_id, self.user) + + if not execution: + await self.send(json.dumps({ + "type": "error", + "message": "Execution not found" + })) + return + + await self.update_execution_status(execution, 'failed') + await self.send(json.dumps({ + "type": "execution_stopped", + "execution_id": str(execution.uuid), + "message": "Agent execution stopped by user" + })) + + async def agent_event(self, event): + await self.send(json.dumps({ + "type": "agent_event", + "event_type": event['event_type'], + "content": event['content'], + "timestamp": event['timestamp'] + })) + + async def agent_completed(self, event): + await self.send(json.dumps({ + "type": "execution_completed", + "execution_id": event['execution_id'], + "output_data": event['output_data'], + "message": "Agent execution completed" + })) + + async def agent_error(self, event): + await self.send(json.dumps({ + "type": "execution_error", + "execution_id": event['execution_id'], + "error_message": event['error_message'] + })) + + @database_sync_to_async + def get_agent(self, agent_id, user): + try: + return Agent.objects.get(uuid=agent_id, user=user) + except Agent.DoesNotExist: + return None + + @database_sync_to_async + def get_execution(self, execution_id, user): + try: + return AgentExecution.objects.get(uuid=execution_id, user=user) + except AgentExecution.DoesNotExist: + return None + + @database_sync_to_async + def create_execution(self, agent, user, input_data): + return AgentExecution.objects.create( + agent=agent, + user=user, + input_data=input_data + ) + + @database_sync_to_async + def update_execution_status(self, execution, status): + execution.status = status + execution.save() + return execution + + @database_sync_to_async + def create_event(self, execution, event_type, content): + return AgentEvent.objects.create( + execution=execution, + event_type=event_type, + content=content + ) diff --git a/apps/agents/migrations/0001_initial.py b/apps/agents/migrations/0001_initial.py new file mode 100644 index 0000000..f7fbb30 --- /dev/null +++ b/apps/agents/migrations/0001_initial.py @@ -0,0 +1,66 @@ +# Generated by Django 5.2.8 on 2025-12-17 14:05 + +import django.db.models.deletion +import uuid +from django.conf import settings +from django.db import migrations, models + + +class Migration(migrations.Migration): + + initial = True + + dependencies = [ + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ] + + operations = [ + migrations.CreateModel( + name='Agent', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('uuid', models.UUIDField(default=uuid.uuid4, editable=False, unique=True)), + ('name', models.CharField(max_length=255)), + ('description', models.TextField(blank=True, default='')), + ('status', models.CharField(choices=[('idle', 'Idle'), ('running', 'Running'), ('paused', 'Paused'), ('completed', 'Completed'), ('failed', 'Failed')], default='idle', max_length=20)), + ('task_id', models.CharField(blank=True, max_length=255, null=True)), + ('created_at', models.DateTimeField(auto_now_add=True)), + ('updated_at', models.DateTimeField(auto_now=True)), + ('started_at', models.DateTimeField(blank=True, null=True)), + ('completed_at', models.DateTimeField(blank=True, null=True)), + ('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='agents', to=settings.AUTH_USER_MODEL)), + ], + ), + migrations.CreateModel( + name='AgentExecution', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('uuid', models.UUIDField(default=uuid.uuid4, editable=False, unique=True)), + ('status', models.CharField(choices=[('queued', 'Queued'), ('running', 'Running'), ('completed', 'Completed'), ('failed', 'Failed')], default='queued', max_length=20)), + ('input_data', models.JSONField(default=dict)), + ('output_data', models.JSONField(blank=True, default=dict)), + ('error_message', models.TextField(blank=True, default='')), + ('created_at', models.DateTimeField(auto_now_add=True)), + ('started_at', models.DateTimeField(blank=True, null=True)), + ('completed_at', models.DateTimeField(blank=True, null=True)), + ('agent', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='executions', to='agents.agent')), + ('user', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='agent_executions', to=settings.AUTH_USER_MODEL)), + ], + ), + migrations.CreateModel( + name='AgentEvent', + fields=[ + ('id', models.BigAutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')), + ('uuid', models.UUIDField(default=uuid.uuid4, editable=False, unique=True)), + ('event_type', models.CharField(choices=[('started', 'Started'), ('message', 'Message'), ('progress', 'Progress'), ('completed', 'Completed'), ('error', 'Error'), ('step', 'Step')], max_length=20)), + ('content', models.JSONField()), + ('timestamp', models.DateTimeField(auto_now_add=True)), + ('execution', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='events', to='agents.agentexecution')), + ], + options={ + 'verbose_name': 'Agent Event', + 'verbose_name_plural': 'Agent Events', + 'ordering': ['timestamp'], + }, + ), + ] diff --git a/apps/agents/migrations/__init__.py b/apps/agents/migrations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/apps/agents/models.py b/apps/agents/models.py index c1067c3..340f09c 100644 --- a/apps/agents/models.py +++ b/apps/agents/models.py @@ -1,22 +1,88 @@ from django.db import models from django.utils import timezone +from apps.users.models import User +import uuid -class AgentRun(models.Model): - agent_name = models.CharField(max_length=255) - input_text = models.TextField() - output_text = models.TextField(blank=True, null=True) - created_at = models.DateTimeField(default=timezone.now) +class Agent(models.Model): + STATUS_CHOICES = [ + ('idle', 'Idle'), + ('running', 'Running'), + ('paused', 'Paused'), + ('completed', 'Completed'), + ('failed', 'Failed'), + ] - class Meta: - ordering = ["-created_at"] + uuid = models.UUIDField(default=uuid.uuid4, editable=False, unique=True) + user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='agents') + name = models.CharField(max_length=255) + description = models.TextField(blank=True, default="") + status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='idle') + task_id = models.CharField(max_length=255, blank=True, null=True) + + created_at = models.DateTimeField(auto_now_add=True) + updated_at = models.DateTimeField(auto_now=True) + started_at = models.DateTimeField(null=True, blank=True) + completed_at = models.DateTimeField(null=True, blank=True) + + def __str__(self) -> str: + return f"{self.name} ({self.status})" + + class Meta: + verbose_name = "Agent" + verbose_name_plural = "Agents" -class ModelMeta(models.Model): - name = models.CharField(max_length=255) - path = models.CharField(max_length=1024, blank=True, null=True) - framework = models.CharField(max_length=100, blank=True, null=True) - created_at = models.DateTimeField(default=timezone.now) +class AgentExecution(models.Model): + uuid = models.UUIDField(default=uuid.uuid4, editable=False, unique=True) + agent = models.ForeignKey(Agent, on_delete=models.CASCADE, related_name='executions') + user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='agent_executions') + + status = models.CharField(max_length=20, choices=[ + ('queued', 'Queued'), + ('running', 'Running'), + ('completed', 'Completed'), + ('failed', 'Failed'), + ], default='queued') + + input_data = models.JSONField(default=dict) + output_data = models.JSONField(default=dict, blank=True) + error_message = models.TextField(blank=True, default="") + + created_at = models.DateTimeField(auto_now_add=True) + started_at = models.DateTimeField(null=True, blank=True) + completed_at = models.DateTimeField(null=True, blank=True) + + def __str__(self) -> str: + return f"Execution {self.uuid} - {self.agent.name} ({self.status})" + + class Meta: + verbose_name = "Agent Execution" + verbose_name_plural = "Agent Executions" + + +class AgentEvent(models.Model): + EVENT_TYPES = [ + ('started', 'Started'), + ('message', 'Message'), + ('progress', 'Progress'), + ('completed', 'Completed'), + ('error', 'Error'), + ('step', 'Step'), + ] + + uuid = models.UUIDField(default=uuid.uuid4, editable=False, unique=True) + execution = models.ForeignKey(AgentExecution, on_delete=models.CASCADE, related_name='events') + event_type = models.CharField(max_length=20, choices=EVENT_TYPES) + + content = models.JSONField() + timestamp = models.DateTimeField(auto_now_add=True) + + def __str__(self) -> str: + return f"{self.id} - {self.event_type} - {self.execution.agent.name}" + + class Meta: + ordering = ['timestamp'] + verbose_name = "Agent Event" + verbose_name_plural = "Agent Events" - def __str__(self) -> str: # pragma: no cover - trivial - return self.name diff --git a/apps/agents/routing.py b/apps/agents/routing.py new file mode 100644 index 0000000..07e7d37 --- /dev/null +++ b/apps/agents/routing.py @@ -0,0 +1,7 @@ +from django.urls import path +from apps.agents import consumers + + +websocket_urlpatterns = [ + path('ws/agents//', consumers.AgentConsumer.as_asgi()), +] diff --git a/apps/agents/serializers.py b/apps/agents/serializers.py new file mode 100644 index 0000000..9ac12c8 --- /dev/null +++ b/apps/agents/serializers.py @@ -0,0 +1,26 @@ +from rest_framework import serializers +from apps.agents.models import Agent, AgentExecution, AgentEvent + + +class AgentEventSerializer(serializers.ModelSerializer): + class Meta: + model = AgentEvent + fields = ['uuid', 'event_type', 'content', 'timestamp'] + + +class AgentExecutionSerializer(serializers.ModelSerializer): + events = AgentEventSerializer(many=True, read_only=True) + + class Meta: + model = AgentExecution + fields = ['uuid', 'agent', 'user', 'status', 'input_data', 'output_data', 'error_message', 'created_at', 'started_at', 'completed_at', 'events'] + read_only_fields = ['uuid', 'created_at', 'started_at', 'completed_at', 'events'] + + +class AgentSerializer(serializers.ModelSerializer): + executions = AgentExecutionSerializer(many=True, read_only=True) + + class Meta: + model = Agent + fields = ['uuid', 'user', 'name', 'description', 'status', 'task_id', 'created_at', 'updated_at', 'started_at', 'completed_at', 'executions'] + read_only_fields = ['uuid', 'user', 'created_at', 'updated_at'] diff --git a/apps/agents/tasks.py b/apps/agents/tasks.py new file mode 100644 index 0000000..4b2384b --- /dev/null +++ b/apps/agents/tasks.py @@ -0,0 +1,135 @@ +from celery import shared_task +from django.utils import timezone +from channels.layers import get_channel_layer +from asgiref.sync import async_to_sync +from apps.agents.models import Agent, AgentExecution, AgentEvent +import json +from django.conf import settings +import asyncio + + +@shared_task +def start_agent_task_mcp(execution_id): + print(f"[start_agent_task_mcp] invoked with execution_id={execution_id}") + try: + execution = AgentExecution.objects.get(uuid=execution_id) + print(f"[start_agent_task_mcp] execution record loaded: agent={execution.agent.uuid}") + execution.status = 'running' + execution.started_at = timezone.now() + execution.save() + + channel_layer = get_channel_layer() + room_group_name = f"agent_{execution.agent.uuid}" + + try: + async_to_sync(channel_layer.group_send)( + room_group_name, + { + "type": "agent_event", + "event_type": "started", + "content": { + "execution_id": str(execution.uuid), + "agent_id": str(execution.agent.uuid), + "message": "Agent execution started (via MCP)" + }, + "timestamp": timezone.now().isoformat() + } + ) + except Exception as channel_error: + print(f"Channel layer error: {channel_error}") + + AgentEvent.objects.create( + execution=execution, + event_type='started', + content={"execution_id": str(execution.uuid), "method": "mcp"} + ) + + from mcp_agent.mcp_client import MCPAgentClient + + async def execute_remote(): + async with MCPAgentClient() as client: + return await client.execute_agent( + agent_id=str(execution.agent.uuid), + agent_name=execution.agent.name, + execution_id=str(execution.uuid), + query=execution.input_data.get("query", ""), + input_data=execution.input_data + ) + + result = asyncio.run(execute_remote()) + print(f"[start_agent_task_mcp] MCP result: {result.get('status')}") + + if result.get('events'): + for event in result['events']: + try: + async_to_sync(channel_layer.group_send)( + room_group_name, + { + "type": "agent_event", + "event_type": event.get('type', 'message'), + "content": event, + "timestamp": event.get('timestamp', timezone.now().isoformat()) + } + ) + except Exception as e: + print(f"Error forwarding event: {e}") + + + if result.get('status') == 'completed': + execution.status = 'completed' + execution.output_data = result + elif result.get('status') in ['failed', 'error']: + execution.status = 'failed' + execution.error_message = result.get('error', 'Unknown error') + execution.output_data = result + else: + execution.status = 'completed' + execution.output_data = result + + execution.completed_at = timezone.now() + execution.save() + + try: + async_to_sync(channel_layer.group_send)( + room_group_name, + { + "type": "agent_completed", + "execution_id": str(execution.uuid), + "output_data": result + } + ) + except Exception as channel_error: + print(f"Channel layer error: {channel_error}") + + AgentEvent.objects.create( + execution=execution, + event_type='completed', + content={"execution_id": str(execution.uuid), "output": result} + ) + + except AgentExecution.DoesNotExist: + print(f"Execution {execution_id} not found") + except Exception as e: + print(f"[start_agent_task_mcp] exception: {e}") + import traceback + traceback.print_exc() + try: + execution = AgentExecution.objects.get(uuid=execution_id) + execution.status = 'failed' + execution.error_message = str(e) + execution.completed_at = timezone.now() + execution.save() + + channel_layer = get_channel_layer() + room_group_name = f"agent_{execution.agent.uuid}" + async_to_sync(channel_layer.group_send)( + room_group_name, + { + "type": "agent_error", + "execution_id": str(execution.uuid), + "error_message": str(e) + } + ) + except: + pass + diff --git a/apps/agents/viewsets.py b/apps/agents/viewsets.py new file mode 100644 index 0000000..7024101 --- /dev/null +++ b/apps/agents/viewsets.py @@ -0,0 +1,57 @@ +from rest_framework import viewsets +from rest_framework.decorators import action +from rest_framework.response import Response +from rest_framework.permissions import IsAuthenticated +from apps.agents.models import Agent, AgentExecution +from apps.agents.serializers import AgentSerializer, AgentExecutionSerializer +from apps.agents.tasks import start_agent_task_mcp + + +class AgentViewSet(viewsets.ModelViewSet): + serializer_class = AgentSerializer + permission_classes = [IsAuthenticated] + lookup_field = 'uuid' + + def get_queryset(self): + return Agent.objects.filter(user=self.request.user) + + def perform_create(self, serializer): + serializer.save(user=self.request.user) + + @action(detail=True, methods=['post']) + def start(self, request, uuid=None): + agent = self.get_object() + input_data = request.data.get('input_data', {}) + + execution = AgentExecution.objects.create( + agent=agent, + user=request.user, + input_data=input_data + ) + + start_agent_task_mcp.delay(str(execution.uuid)) + + serializer = AgentExecutionSerializer(execution) + return Response({ + "status": "queued", + "execution": serializer.data, + "message": "Agent task queued for execution" + }) + + +class AgentExecutionViewSet(viewsets.ReadOnlyModelViewSet): + serializer_class = AgentExecutionSerializer + permission_classes = [IsAuthenticated] + lookup_field = 'uuid' + + def get_queryset(self): + return AgentExecution.objects.filter(user=self.request.user) + + @action(detail=True, methods=['get']) + def events(self, request, uuid=None): + execution = self.get_object() + events = execution.events.all().values() + return Response({ + "execution_id": str(execution.uuid), + "events": list(events) + })