Dynavera/mcp_agent/mcp_client.py

87 lines
3.5 KiB
Python
Raw Normal View History

import httpx
import asyncio
2026-01-28 09:56:02 +00:00
import logging
logger = logging.getLogger(__name__)
class MCPClient:
2026-01-28 09:56:02 +00:00
def __init__(self, server_url: str, timeout: int = 3600):
self.server_url = server_url
2026-01-28 09:56:02 +00:00
self.client = httpx.AsyncClient(timeout=timeout)
logger.info(f"MCPClient initialized for {server_url} with timeout={timeout}s ({timeout//60} minutes)")
async def send(self, tool: str, arguments: dict):
2026-01-28 09:56:02 +00:00
logger.info(f"MCPClient: Sending request to {self.server_url}/execute for tool '{tool}'")
try:
response = await self.client.post(
f"{self.server_url}/execute",
json={
"tool": tool,
"arguments": arguments,
},
)
logger.info(f"MCPClient: Received response with status={response.status_code}")
logger.debug(f"MCPClient: Response headers: {response.headers}")
except asyncio.TimeoutError as e:
logger.error(f"MCPClient: Request timeout for tool '{tool}': {str(e)}")
raise Exception(f"MCP tool '{tool}' request timed out (>3600s / 1 hour). Model loading or fine-tuning may be too slow.")
except Exception as e:
logger.error(f"MCPClient: Request failed for tool '{tool}': {str(e)}", exc_info=True)
raise Exception(f"MCP tool '{tool}' request failed: {str(e)}")
if response.status_code >= 400:
error_data = {}
try:
error_data = response.json()
logger.error(f"MCPClient: HTTP error {response.status_code}: {error_data}")
except:
logger.error(f"MCPClient: HTTP error {response.status_code} (could not parse JSON)")
pass
error_msg = error_data.get("error") or error_data.get("details") or f"HTTP {response.status_code}"
raise Exception(f"MCP tool '{tool}' failed: {error_msg}. Full response: {error_data}")
try:
result = response.json()
logger.debug(f"MCPClient: Parsed JSON response: status={result.get('status')}")
except Exception as e:
logger.error(f"MCPClient: Failed to parse response JSON: {str(e)}")
logger.error(f"MCPClient: Raw response text: {response.text[:500]}")
raise Exception(f"MCP tool '{tool}' returned invalid JSON: {str(e)}")
if isinstance(result, dict) and result.get("status") == "failed":
error_msg = result.get("error") or result.get("details") or "Unknown error"
traceback_info = result.get("traceback", "")
full_error = f"MCP tool '{tool}' returned failure: {error_msg}"
if traceback_info:
full_error += f"\n\nServer traceback:\n{traceback_info}"
logger.error(f"MCPClient: {full_error}")
raise Exception(full_error)
logger.info(f"MCPClient: Tool '{tool}' completed successfully")
return result
async def health(self):
response = await self.client.get(f"{self.server_url}/health")
response.raise_for_status()
return response.json()
async def close(self):
await self.client.aclose()
async def main():
client = MCPClient("http://localhost:8001")
result = await client.send(
tool="echo",
arguments={"message": "hello from client"},
)
print(result)
await client.close()
if __name__ == "__main__":
asyncio.run(main())