test_mcp_server.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443
  1. """
  2. Integration test for the MCP HTTP server using run_mcp and HostedMCPTool.
  3. This test spins up the MCP server over streamable-http, then verifies that an
  4. Agent configured with a HostedMCPTool can discover available tools and invoke
  5. one of them.
  6. """
  7. import logging
  8. import os
  9. import socket
  10. import sys
  11. import tempfile
  12. import threading
  13. import time
  14. from pathlib import Path
  15. from typing import Any
  16. from unittest.mock import AsyncMock, MagicMock, patch
  17. import httpx
  18. import pytest
  19. from agents import ModelSettings
  20. from agents.mcp.server import (
  21. MCPServerStdio,
  22. MCPServerStdioParams,
  23. MCPServerStreamableHttp,
  24. MCPServerStreamableHttpParams,
  25. )
  26. from fastmcp import FastMCP
  27. from agency_swarm import Agency, Agent, run_mcp
  28. from tests.data.tools.sample_tool import sample_tool
  29. def _tools_dir() -> str:
  30. # Use sample tools bundled with tests
  31. return str(Path(__file__).parents[2] / "data" / "tools")
  32. def _reserve_port(host: str = "127.0.0.1") -> int:
  33. with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
  34. sock.bind((host, 0))
  35. return sock.getsockname()[1]
  36. def _route_methods(app: Any, path: str) -> set[str] | None:
  37. route = next(route for route in app.routes if getattr(route, "path", None) == path)
  38. return getattr(route, "methods", None)
  39. @pytest.fixture(scope="module")
  40. def mcp_http_server():
  41. """Start MCP tools server over HTTP in a background thread."""
  42. port = _reserve_port()
  43. server_url = f"http://127.0.0.1:{port}"
  44. def _run_server():
  45. # Disable auth for the test by using an empty env var name
  46. run_mcp(
  47. tools=_tools_dir(),
  48. host="127.0.0.1",
  49. port=port,
  50. app_token_env="", # no auth
  51. server_name="mcp-tools-server",
  52. transport="streamable-http",
  53. )
  54. thread = threading.Thread(target=_run_server, daemon=True)
  55. thread.start()
  56. # Wait for server to start
  57. max_retries = 30
  58. for i in range(max_retries):
  59. try:
  60. # Any response indicates the server is listening; endpoint may not be GET-able
  61. resp = httpx.get(server_url + "/mcp", timeout=2.0)
  62. if resp.status_code in (200, 400, 404, 405):
  63. # Give it a brief moment more to fully initialize
  64. time.sleep(0.5)
  65. break
  66. except Exception:
  67. time.sleep(0.5)
  68. if i == max_retries - 1:
  69. pytest.skip("Could not start MCP HTTP server")
  70. yield server_url
  71. # No explicit shutdown; thread is daemon and server ends with process
  72. def _make_agency_with_local_mcp(server_url: str) -> Agency:
  73. """Create an Agency with a local MCP client pointing to the HTTP server."""
  74. mcp_client = MCPServerStreamableHttp(
  75. name="Local_MCP_Server",
  76. params=MCPServerStreamableHttpParams(
  77. url=server_url + "/mcp",
  78. headers={},
  79. ),
  80. cache_tools_list=True,
  81. )
  82. agent = Agent(
  83. name="MCP HTTP Agent",
  84. description="Agent using local MCP tools over HTTP",
  85. mcp_servers=[mcp_client],
  86. model_settings=ModelSettings(temperature=0),
  87. )
  88. return Agency(agent, name="mcp_http_agency", shared_instructions="Test MCP HTTP Integration")
  89. @pytest.mark.asyncio
  90. async def test_mcp_http_tools_list(mcp_http_server):
  91. """Verify the agent can discover tools exposed by the MCP HTTP server."""
  92. agency = _make_agency_with_local_mcp(mcp_http_server)
  93. res = await agency.get_response("What tools do you have?")
  94. text = str(res.final_output).lower()
  95. # sample_tool is provided by tests/data/tools/sample_tool.py
  96. assert "sample_tool" in text or "sample tool" in text
  97. @pytest.mark.asyncio
  98. async def test_mcp_http_invoke_sample_tool(mcp_http_server):
  99. """Verify the agent can invoke a local MCP tool over HTTP."""
  100. agency = _make_agency_with_local_mcp(mcp_http_server)
  101. res = await agency.get_response("Use sample_tool to echo 'hello mcp'.")
  102. assert "echo" in str(res.final_output).lower()
  103. @pytest.mark.asyncio
  104. async def test_mcp_http_error_cases():
  105. """Test error handling in run_mcp function."""
  106. # Test empty tools list
  107. with pytest.raises(ValueError, match="No tools provided"):
  108. run_mcp(tools=[], return_app=True)
  109. # Test empty directory
  110. empty_dir = Path(__file__).parent / "empty_test_dir"
  111. empty_dir.mkdir(exist_ok=True)
  112. try:
  113. with pytest.raises(ValueError, match="No BaseTool classes found in directory"):
  114. run_mcp(tools=str(empty_dir), return_app=True)
  115. finally:
  116. empty_dir.rmdir()
  117. # Test duplicate tool names
  118. duplicate_tool = sample_tool # Same tool twice
  119. with pytest.raises(ValueError, match="Duplicate tool name detected"):
  120. run_mcp(tools=[sample_tool, duplicate_tool], return_app=True)
  121. @pytest.mark.asyncio
  122. async def test_mcp_stdio_server_integration():
  123. """Test MCP stdio server integration with agent."""
  124. # Create a temporary MCP server script with inline tool definition
  125. server_script = """
  126. import sys
  127. import os
  128. sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
  129. from agents import function_tool
  130. from agency_swarm.integrations.mcp_server import run_mcp
  131. @function_tool
  132. def test_sample_tool(text: str) -> str:
  133. \"\"\"Echo tool that returns the input text.\"\"\"
  134. return f"Echo: {text}"
  135. if __name__ == "__main__":
  136. run_mcp(tools=[test_sample_tool], transport="stdio")
  137. """
  138. with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f:
  139. f.write(server_script)
  140. server_path = f.name
  141. try:
  142. # Set up MCP client pointing to our stdio server
  143. stdio_server = MCPServerStdio(
  144. name="Test_Stdio_Server",
  145. params=MCPServerStdioParams(
  146. command=sys.executable,
  147. args=[server_path],
  148. ),
  149. client_session_timeout_seconds=10,
  150. )
  151. agent = Agent(
  152. name="MCP Stdio Agent",
  153. model_settings=ModelSettings(temperature=0),
  154. mcp_servers=[stdio_server],
  155. )
  156. agency = Agency(
  157. agent,
  158. name="mcp_stdio_test_agency",
  159. shared_instructions="Test MCP stdio server integration",
  160. )
  161. # Test that agent can list tools from stdio server
  162. res = await agency.get_response("What tools do you have available?")
  163. response_text = str(res.final_output).lower()
  164. # Should find test_sample_tool from our stdio server
  165. assert "test_sample_tool" in response_text or "test sample tool" in response_text
  166. finally:
  167. # Cleanup
  168. Path(server_path).unlink(missing_ok=True)
  169. def test_mcp_with_auth_token():
  170. """Test authentication middleware setup."""
  171. # Set a test token
  172. os.environ["TEST_MCP_TOKEN"] = "test-token-123"
  173. try:
  174. app = run_mcp(tools=[sample_tool], app_token_env="TEST_MCP_TOKEN", transport="streamable-http", return_app=True)
  175. assert app is not None
  176. # Verify auth middleware was actually added
  177. assert len(app.middleware) > 0, "Auth middleware should be added when token is provided"
  178. middleware = next((item for item in app.middleware if hasattr(item, "expected")), None)
  179. # Check that it's the StaticBearer middleware with correct token
  180. assert middleware is not None, "Auth middleware should have 'expected' attribute"
  181. assert middleware.expected == "Bearer test-token-123", "Middleware should expect correct Bearer token"
  182. finally:
  183. del os.environ["TEST_MCP_TOKEN"]
  184. def test_mcp_stdio_with_auth_warning(caplog):
  185. """Test stdio transport with auth warning."""
  186. # Set a test token for stdio (should trigger warning)
  187. os.environ["TEST_STDIO_TOKEN"] = "test-token-456"
  188. try:
  189. with caplog.at_level(logging.WARNING):
  190. app = run_mcp(tools=[sample_tool], app_token_env="TEST_STDIO_TOKEN", transport="stdio", return_app=True)
  191. assert app is not None
  192. assert any("Stdio servers do not support authentication" in record.message for record in caplog.records)
  193. finally:
  194. del os.environ["TEST_STDIO_TOKEN"]
  195. @pytest.mark.parametrize("transport", ["http", "streamable-http"])
  196. def test_run_mcp_direct_http_uses_stateless_mode(transport):
  197. """Direct FastMCP HTTP runs preserve Agency Swarm's previous stateless default."""
  198. from agency_swarm import run_mcp
  199. with patch("agency_swarm.integrations.mcp_server.FastMCP.run") as run_mock:
  200. run_mcp(
  201. tools=[sample_tool],
  202. app_token_env="", # no auth middleware
  203. transport=transport,
  204. uvicorn_config={"timeout_graceful_shutdown": 1},
  205. )
  206. run_mock.assert_called_once()
  207. _, kwargs = run_mock.call_args
  208. assert kwargs["transport"] == transport
  209. assert kwargs["uvicorn_config"] == {"timeout_graceful_shutdown": 1}
  210. assert kwargs["stateless_http"] is True
  211. def test_run_mcp_preserves_sse_stateful_transport():
  212. """SSE transport does not support FastMCP stateless mode."""
  213. from agency_swarm import run_mcp
  214. with patch("agency_swarm.integrations.mcp_server.FastMCP.run") as run_mock:
  215. run_mcp(
  216. tools=[sample_tool],
  217. app_token_env="", # no auth middleware
  218. transport="sse",
  219. uvicorn_config={"timeout_graceful_shutdown": 1},
  220. )
  221. run_mock.assert_called_once()
  222. _, kwargs = run_mock.call_args
  223. assert kwargs["transport"] == "sse"
  224. assert kwargs["uvicorn_config"] == {"timeout_graceful_shutdown": 1}
  225. assert kwargs["stateless_http"] is False
  226. def test_returned_mcp_app_preserves_fastmcp_interface() -> None:
  227. """Returned apps keep the FastMCP interface for caller-owned serving."""
  228. from agency_swarm import run_mcp
  229. app = run_mcp(tools=[sample_tool], app_token_env="", transport="streamable-http", return_app=True)
  230. assert isinstance(app, FastMCP)
  231. @pytest.mark.parametrize("transport", ["http", "streamable-http"])
  232. def test_returned_mcp_app_http_app_uses_agency_swarm_stateless_default(transport: str) -> None:
  233. """Returned apps preserve Agency Swarm's previous stateless HTTP default."""
  234. from agency_swarm import run_mcp
  235. app = run_mcp(tools=[sample_tool], app_token_env="", transport=transport, return_app=True)
  236. http_app = app.http_app(transport=transport)
  237. assert http_app.state.transport_type == "streamable-http"
  238. assert _route_methods(http_app, "/mcp") == {"POST", "DELETE"}
  239. def test_returned_mcp_app_http_app_allows_stateful_http_override() -> None:
  240. """Caller-owned returned apps can still opt into FastMCP's stateful HTTP mode."""
  241. from agency_swarm import run_mcp
  242. app = run_mcp(tools=[sample_tool], app_token_env="", transport="streamable-http", return_app=True)
  243. http_app = app.http_app(transport="streamable-http", stateless_http=False)
  244. assert http_app.state.transport_type == "streamable-http"
  245. assert _route_methods(http_app, "/mcp") is None
  246. @pytest.mark.parametrize("transport", ["http", "streamable-http"])
  247. def test_returned_mcp_app_run_uses_agency_swarm_stateless_default(transport: str) -> None:
  248. """Returned app .run() preserves Agency Swarm's previous stateless HTTP default."""
  249. from agency_swarm import run_mcp
  250. app = run_mcp(tools=[sample_tool], app_token_env="", transport=transport, return_app=True)
  251. with patch("agency_swarm.integrations.mcp_server.FastMCP.run_http_async", new_callable=AsyncMock) as run_mock:
  252. app.run(transport=transport, show_banner=False)
  253. run_mock.assert_awaited_once()
  254. _, kwargs = run_mock.await_args
  255. assert kwargs["transport"] == transport
  256. assert kwargs["show_banner"] is False
  257. assert kwargs["stateless_http"] is True
  258. def test_returned_mcp_app_run_preserves_sse_stateful_default() -> None:
  259. """Returned app .run() keeps SSE stateful even when FastMCP defaults are stateless."""
  260. from fastmcp import settings as fastmcp_settings
  261. from agency_swarm import run_mcp
  262. app = run_mcp(tools=[sample_tool], app_token_env="", transport="sse", return_app=True)
  263. with (
  264. patch.object(fastmcp_settings, "stateless_http", True),
  265. patch("agency_swarm.integrations.mcp_server.FastMCP.run_http_async", new_callable=AsyncMock) as run_mock,
  266. ):
  267. app.run(transport="sse", show_banner=False)
  268. run_mock.assert_awaited_once()
  269. _, kwargs = run_mock.await_args
  270. assert kwargs["transport"] == "sse"
  271. assert kwargs["show_banner"] is False
  272. assert kwargs["stateless_http"] is False
  273. def test_mcp_unsupported_tool_type():
  274. """Test unsupported tool type error."""
  275. # Create a mock tool that's neither BaseTool nor FunctionTool
  276. class UnsupportedTool:
  277. name = "unsupported_tool"
  278. with pytest.raises(ValueError, match="Unexpected tool type"):
  279. run_mcp(tools=[UnsupportedTool()], return_app=True)
  280. def test_mcp_base_tool_conversion():
  281. """Test BaseTool to FunctionTool conversion."""
  282. from pydantic import Field
  283. from agency_swarm import BaseTool
  284. from agency_swarm.integrations.mcp_server import run_mcp
  285. class TestBaseTool(BaseTool):
  286. """A test BaseTool for conversion testing."""
  287. input_text: str = Field(..., description="Input text")
  288. def run(self):
  289. return f"BaseTool result: {self.input_text}"
  290. app = run_mcp(tools=[TestBaseTool], return_app=True)
  291. assert app is not None
  292. @pytest.mark.asyncio
  293. async def test_mcp_auth_middleware_methods():
  294. """Test authentication middleware on_request and on_read_resource methods."""
  295. # Set up environment for auth
  296. os.environ["TEST_AUTH_TOKEN"] = "test-auth-token"
  297. try:
  298. app = run_mcp(
  299. tools=[sample_tool], app_token_env="TEST_AUTH_TOKEN", transport="streamable-http", return_app=True
  300. )
  301. # Get the auth middleware that was added
  302. middleware = next((item for item in app.middleware if hasattr(item, "expected")), None)
  303. assert middleware is not None
  304. # Test on_request method with correct auth
  305. mock_ctx = MagicMock()
  306. mock_call_next = AsyncMock(return_value="success")
  307. # Test with correct authorization header by patching the module where it's imported
  308. with patch(
  309. "agency_swarm.integrations.mcp_server.get_http_headers",
  310. return_value={"authorization": "Bearer test-auth-token"},
  311. ):
  312. result = await middleware.on_request(mock_ctx, mock_call_next)
  313. assert result == "success"
  314. mock_call_next.assert_called_once()
  315. # Test on_read_resource method with correct auth
  316. mock_call_next.reset_mock()
  317. result = await middleware.on_read_resource(mock_ctx, mock_call_next)
  318. assert result == "success"
  319. mock_call_next.assert_called_once()
  320. # Test with incorrect authorization (should raise McpError)
  321. mock_call_next.reset_mock()
  322. with patch(
  323. "agency_swarm.integrations.mcp_server.get_http_headers",
  324. return_value={"authorization": "Bearer wrong-token"},
  325. ):
  326. from fastmcp.exceptions import McpError
  327. with pytest.raises(McpError):
  328. await middleware.on_request(mock_ctx, mock_call_next)
  329. with pytest.raises(McpError):
  330. await middleware.on_read_resource(mock_ctx, mock_call_next)
  331. finally:
  332. del os.environ["TEST_AUTH_TOKEN"]