Dynavera/mcp_agent/mcp_client.py
Viswamedha Nalabotu 03490762be Fixed core issues
2025-12-20 20:40:23 +00:00

129 lines
4 KiB
Python

import httpx
import json
from typing import Optional, Dict, Any, List
from django.conf import settings
import asyncio
import logging
logger = logging.getLogger(__name__)
class MCPAgentClient:
def __init__(self, server_url: Optional[str] = None):
self.server_url = server_url or getattr(settings, 'MCP_AGENT_URL')
self.http_client = httpx.AsyncClient(
timeout=httpx.Timeout(300.0),
follow_redirects=True
)
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.http_client:
await self.http_client.aclose()
async def execute_agent(
self,
agent_id: str,
agent_name: str,
execution_id: str,
query: str,
input_data: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
arguments = {
"agent_id": agent_id,
"agent_name": agent_name,
"execution_id": execution_id,
"query": query,
"input_data": input_data or {}
}
return await self._execute_via_http(arguments)
async def _execute_via_http(self, arguments: Dict[str, Any]) -> Dict[str, Any]:
if not self.http_client:
raise RuntimeError("HTTP client not initialized")
try:
response = await self.http_client.post(
f"{self.server_url}/execute",
json={
"tool": "execute_agent",
"arguments": arguments
},
headers={"Content-Type": "application/json"}
)
response.raise_for_status()
return response.json()
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error from MCP server: {e.response.status_code} - {e.response.text}")
return {
"status": "failed",
"error": f"Server returned {e.response.status_code}",
"error_type": "HTTPError",
"details": e.response.text
}
except httpx.RequestError as e:
logger.error(f"Request error to MCP server: {e}")
return {
"status": "failed",
"error": f"Failed to connect to MCP server at {self.server_url}",
"error_type": "ConnectionError"
}
except Exception as e:
logger.error(f"Unexpected error in HTTP execution: {e}")
return {
"status": "failed",
"error": str(e),
"error_type": type(e).__name__
}
async def health_check(self) -> Dict[str, Any]:
try:
response = await self.http_client.get(f"{self.server_url}/health")
response.raise_for_status()
return response.json()
except Exception as e:
return {"status": "unhealthy", "error": str(e)}
async def list_tools(self) -> List[Dict[str, Any]]:
return [
{
"name": "execute_agent",
"description": "Execute an AI agent with given query and input data"
},
{
"name": "health_check",
"description": "Check if the agent server is healthy"
}
]
async def close(self):
if self.http_client:
await self.http_client.aclose()
_mcp_client_instance: Optional[MCPAgentClient] = None
_client_lock = asyncio.Lock()
async def get_mcp_client() -> MCPAgentClient:
global _mcp_client_instance
async with _client_lock:
if _mcp_client_instance is None:
server_url = getattr(settings, 'MCP_AGENT_URL')
_mcp_client_instance = MCPAgentClient(server_url=server_url)
return _mcp_client_instance
async def close_mcp_client():
global _mcp_client_instance
async with _client_lock:
if _mcp_client_instance is not None:
await _mcp_client_instance.close()
_mcp_client_instance = None