Dynavera/mcp_agent/mcp_client.py

42 lines
1,003 B
Python

import httpx
import asyncio
class MCPClient:
def __init__(self, server_url: str):
self.server_url = server_url
self.client = httpx.AsyncClient(timeout=60)
async def send(self, tool: str, arguments: dict):
response = await self.client.post(
f"{self.server_url}/execute",
json={
"tool": tool,
"arguments": arguments,
},
)
response.raise_for_status()
return response.json()
async def health(self):
response = await self.client.get(f"{self.server_url}/health")
response.raise_for_status()
return response.json()
async def close(self):
await self.client.aclose()
async def main():
client = MCPClient("http://localhost:8001")
result = await client.send(
tool="echo",
arguments={"message": "hello from client"},
)
print(result)
await client.close()
if __name__ == "__main__":
asyncio.run(main())