| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443 |
- """
- Integration test for the MCP HTTP server using run_mcp and HostedMCPTool.
- This test spins up the MCP server over streamable-http, then verifies that an
- Agent configured with a HostedMCPTool can discover available tools and invoke
- one of them.
- """
- import logging
- import os
- import socket
- import sys
- import tempfile
- import threading
- import time
- from pathlib import Path
- from typing import Any
- from unittest.mock import AsyncMock, MagicMock, patch
- import httpx
- import pytest
- from agents import ModelSettings
- from agents.mcp.server import (
- MCPServerStdio,
- MCPServerStdioParams,
- MCPServerStreamableHttp,
- MCPServerStreamableHttpParams,
- )
- from fastmcp import FastMCP
- from agency_swarm import Agency, Agent, run_mcp
- from tests.data.tools.sample_tool import sample_tool
- def _tools_dir() -> str:
- # Use sample tools bundled with tests
- return str(Path(__file__).parents[2] / "data" / "tools")
- def _reserve_port(host: str = "127.0.0.1") -> int:
- with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
- sock.bind((host, 0))
- return sock.getsockname()[1]
- def _route_methods(app: Any, path: str) -> set[str] | None:
- route = next(route for route in app.routes if getattr(route, "path", None) == path)
- return getattr(route, "methods", None)
- @pytest.fixture(scope="module")
- def mcp_http_server():
- """Start MCP tools server over HTTP in a background thread."""
- port = _reserve_port()
- server_url = f"http://127.0.0.1:{port}"
- def _run_server():
- # Disable auth for the test by using an empty env var name
- run_mcp(
- tools=_tools_dir(),
- host="127.0.0.1",
- port=port,
- app_token_env="", # no auth
- server_name="mcp-tools-server",
- transport="streamable-http",
- )
- thread = threading.Thread(target=_run_server, daemon=True)
- thread.start()
- # Wait for server to start
- max_retries = 30
- for i in range(max_retries):
- try:
- # Any response indicates the server is listening; endpoint may not be GET-able
- resp = httpx.get(server_url + "/mcp", timeout=2.0)
- if resp.status_code in (200, 400, 404, 405):
- # Give it a brief moment more to fully initialize
- time.sleep(0.5)
- break
- except Exception:
- time.sleep(0.5)
- if i == max_retries - 1:
- pytest.skip("Could not start MCP HTTP server")
- yield server_url
- # No explicit shutdown; thread is daemon and server ends with process
- def _make_agency_with_local_mcp(server_url: str) -> Agency:
- """Create an Agency with a local MCP client pointing to the HTTP server."""
- mcp_client = MCPServerStreamableHttp(
- name="Local_MCP_Server",
- params=MCPServerStreamableHttpParams(
- url=server_url + "/mcp",
- headers={},
- ),
- cache_tools_list=True,
- )
- agent = Agent(
- name="MCP HTTP Agent",
- description="Agent using local MCP tools over HTTP",
- mcp_servers=[mcp_client],
- model_settings=ModelSettings(temperature=0),
- )
- return Agency(agent, name="mcp_http_agency", shared_instructions="Test MCP HTTP Integration")
- @pytest.mark.asyncio
- async def test_mcp_http_tools_list(mcp_http_server):
- """Verify the agent can discover tools exposed by the MCP HTTP server."""
- agency = _make_agency_with_local_mcp(mcp_http_server)
- res = await agency.get_response("What tools do you have?")
- text = str(res.final_output).lower()
- # sample_tool is provided by tests/data/tools/sample_tool.py
- assert "sample_tool" in text or "sample tool" in text
- @pytest.mark.asyncio
- async def test_mcp_http_invoke_sample_tool(mcp_http_server):
- """Verify the agent can invoke a local MCP tool over HTTP."""
- agency = _make_agency_with_local_mcp(mcp_http_server)
- res = await agency.get_response("Use sample_tool to echo 'hello mcp'.")
- assert "echo" in str(res.final_output).lower()
- @pytest.mark.asyncio
- async def test_mcp_http_error_cases():
- """Test error handling in run_mcp function."""
- # Test empty tools list
- with pytest.raises(ValueError, match="No tools provided"):
- run_mcp(tools=[], return_app=True)
- # Test empty directory
- empty_dir = Path(__file__).parent / "empty_test_dir"
- empty_dir.mkdir(exist_ok=True)
- try:
- with pytest.raises(ValueError, match="No BaseTool classes found in directory"):
- run_mcp(tools=str(empty_dir), return_app=True)
- finally:
- empty_dir.rmdir()
- # Test duplicate tool names
- duplicate_tool = sample_tool # Same tool twice
- with pytest.raises(ValueError, match="Duplicate tool name detected"):
- run_mcp(tools=[sample_tool, duplicate_tool], return_app=True)
- @pytest.mark.asyncio
- async def test_mcp_stdio_server_integration():
- """Test MCP stdio server integration with agent."""
- # Create a temporary MCP server script with inline tool definition
- server_script = """
- import sys
- import os
- sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
- from agents import function_tool
- from agency_swarm.integrations.mcp_server import run_mcp
- @function_tool
- def test_sample_tool(text: str) -> str:
- \"\"\"Echo tool that returns the input text.\"\"\"
- return f"Echo: {text}"
- if __name__ == "__main__":
- run_mcp(tools=[test_sample_tool], transport="stdio")
- """
- with tempfile.NamedTemporaryFile(mode="w", suffix=".py", delete=False) as f:
- f.write(server_script)
- server_path = f.name
- try:
- # Set up MCP client pointing to our stdio server
- stdio_server = MCPServerStdio(
- name="Test_Stdio_Server",
- params=MCPServerStdioParams(
- command=sys.executable,
- args=[server_path],
- ),
- client_session_timeout_seconds=10,
- )
- agent = Agent(
- name="MCP Stdio Agent",
- model_settings=ModelSettings(temperature=0),
- mcp_servers=[stdio_server],
- )
- agency = Agency(
- agent,
- name="mcp_stdio_test_agency",
- shared_instructions="Test MCP stdio server integration",
- )
- # Test that agent can list tools from stdio server
- res = await agency.get_response("What tools do you have available?")
- response_text = str(res.final_output).lower()
- # Should find test_sample_tool from our stdio server
- assert "test_sample_tool" in response_text or "test sample tool" in response_text
- finally:
- # Cleanup
- Path(server_path).unlink(missing_ok=True)
- def test_mcp_with_auth_token():
- """Test authentication middleware setup."""
- # Set a test token
- os.environ["TEST_MCP_TOKEN"] = "test-token-123"
- try:
- app = run_mcp(tools=[sample_tool], app_token_env="TEST_MCP_TOKEN", transport="streamable-http", return_app=True)
- assert app is not None
- # Verify auth middleware was actually added
- assert len(app.middleware) > 0, "Auth middleware should be added when token is provided"
- middleware = next((item for item in app.middleware if hasattr(item, "expected")), None)
- # Check that it's the StaticBearer middleware with correct token
- assert middleware is not None, "Auth middleware should have 'expected' attribute"
- assert middleware.expected == "Bearer test-token-123", "Middleware should expect correct Bearer token"
- finally:
- del os.environ["TEST_MCP_TOKEN"]
- def test_mcp_stdio_with_auth_warning(caplog):
- """Test stdio transport with auth warning."""
- # Set a test token for stdio (should trigger warning)
- os.environ["TEST_STDIO_TOKEN"] = "test-token-456"
- try:
- with caplog.at_level(logging.WARNING):
- app = run_mcp(tools=[sample_tool], app_token_env="TEST_STDIO_TOKEN", transport="stdio", return_app=True)
- assert app is not None
- assert any("Stdio servers do not support authentication" in record.message for record in caplog.records)
- finally:
- del os.environ["TEST_STDIO_TOKEN"]
- @pytest.mark.parametrize("transport", ["http", "streamable-http"])
- def test_run_mcp_direct_http_uses_stateless_mode(transport):
- """Direct FastMCP HTTP runs preserve Agency Swarm's previous stateless default."""
- from agency_swarm import run_mcp
- with patch("agency_swarm.integrations.mcp_server.FastMCP.run") as run_mock:
- run_mcp(
- tools=[sample_tool],
- app_token_env="", # no auth middleware
- transport=transport,
- uvicorn_config={"timeout_graceful_shutdown": 1},
- )
- run_mock.assert_called_once()
- _, kwargs = run_mock.call_args
- assert kwargs["transport"] == transport
- assert kwargs["uvicorn_config"] == {"timeout_graceful_shutdown": 1}
- assert kwargs["stateless_http"] is True
- def test_run_mcp_preserves_sse_stateful_transport():
- """SSE transport does not support FastMCP stateless mode."""
- from agency_swarm import run_mcp
- with patch("agency_swarm.integrations.mcp_server.FastMCP.run") as run_mock:
- run_mcp(
- tools=[sample_tool],
- app_token_env="", # no auth middleware
- transport="sse",
- uvicorn_config={"timeout_graceful_shutdown": 1},
- )
- run_mock.assert_called_once()
- _, kwargs = run_mock.call_args
- assert kwargs["transport"] == "sse"
- assert kwargs["uvicorn_config"] == {"timeout_graceful_shutdown": 1}
- assert kwargs["stateless_http"] is False
- def test_returned_mcp_app_preserves_fastmcp_interface() -> None:
- """Returned apps keep the FastMCP interface for caller-owned serving."""
- from agency_swarm import run_mcp
- app = run_mcp(tools=[sample_tool], app_token_env="", transport="streamable-http", return_app=True)
- assert isinstance(app, FastMCP)
- @pytest.mark.parametrize("transport", ["http", "streamable-http"])
- def test_returned_mcp_app_http_app_uses_agency_swarm_stateless_default(transport: str) -> None:
- """Returned apps preserve Agency Swarm's previous stateless HTTP default."""
- from agency_swarm import run_mcp
- app = run_mcp(tools=[sample_tool], app_token_env="", transport=transport, return_app=True)
- http_app = app.http_app(transport=transport)
- assert http_app.state.transport_type == "streamable-http"
- assert _route_methods(http_app, "/mcp") == {"POST", "DELETE"}
- def test_returned_mcp_app_http_app_allows_stateful_http_override() -> None:
- """Caller-owned returned apps can still opt into FastMCP's stateful HTTP mode."""
- from agency_swarm import run_mcp
- app = run_mcp(tools=[sample_tool], app_token_env="", transport="streamable-http", return_app=True)
- http_app = app.http_app(transport="streamable-http", stateless_http=False)
- assert http_app.state.transport_type == "streamable-http"
- assert _route_methods(http_app, "/mcp") is None
- @pytest.mark.parametrize("transport", ["http", "streamable-http"])
- def test_returned_mcp_app_run_uses_agency_swarm_stateless_default(transport: str) -> None:
- """Returned app .run() preserves Agency Swarm's previous stateless HTTP default."""
- from agency_swarm import run_mcp
- app = run_mcp(tools=[sample_tool], app_token_env="", transport=transport, return_app=True)
- with patch("agency_swarm.integrations.mcp_server.FastMCP.run_http_async", new_callable=AsyncMock) as run_mock:
- app.run(transport=transport, show_banner=False)
- run_mock.assert_awaited_once()
- _, kwargs = run_mock.await_args
- assert kwargs["transport"] == transport
- assert kwargs["show_banner"] is False
- assert kwargs["stateless_http"] is True
- def test_returned_mcp_app_run_preserves_sse_stateful_default() -> None:
- """Returned app .run() keeps SSE stateful even when FastMCP defaults are stateless."""
- from fastmcp import settings as fastmcp_settings
- from agency_swarm import run_mcp
- app = run_mcp(tools=[sample_tool], app_token_env="", transport="sse", return_app=True)
- with (
- patch.object(fastmcp_settings, "stateless_http", True),
- patch("agency_swarm.integrations.mcp_server.FastMCP.run_http_async", new_callable=AsyncMock) as run_mock,
- ):
- app.run(transport="sse", show_banner=False)
- run_mock.assert_awaited_once()
- _, kwargs = run_mock.await_args
- assert kwargs["transport"] == "sse"
- assert kwargs["show_banner"] is False
- assert kwargs["stateless_http"] is False
- def test_mcp_unsupported_tool_type():
- """Test unsupported tool type error."""
- # Create a mock tool that's neither BaseTool nor FunctionTool
- class UnsupportedTool:
- name = "unsupported_tool"
- with pytest.raises(ValueError, match="Unexpected tool type"):
- run_mcp(tools=[UnsupportedTool()], return_app=True)
- def test_mcp_base_tool_conversion():
- """Test BaseTool to FunctionTool conversion."""
- from pydantic import Field
- from agency_swarm import BaseTool
- from agency_swarm.integrations.mcp_server import run_mcp
- class TestBaseTool(BaseTool):
- """A test BaseTool for conversion testing."""
- input_text: str = Field(..., description="Input text")
- def run(self):
- return f"BaseTool result: {self.input_text}"
- app = run_mcp(tools=[TestBaseTool], return_app=True)
- assert app is not None
- @pytest.mark.asyncio
- async def test_mcp_auth_middleware_methods():
- """Test authentication middleware on_request and on_read_resource methods."""
- # Set up environment for auth
- os.environ["TEST_AUTH_TOKEN"] = "test-auth-token"
- try:
- app = run_mcp(
- tools=[sample_tool], app_token_env="TEST_AUTH_TOKEN", transport="streamable-http", return_app=True
- )
- # Get the auth middleware that was added
- middleware = next((item for item in app.middleware if hasattr(item, "expected")), None)
- assert middleware is not None
- # Test on_request method with correct auth
- mock_ctx = MagicMock()
- mock_call_next = AsyncMock(return_value="success")
- # Test with correct authorization header by patching the module where it's imported
- with patch(
- "agency_swarm.integrations.mcp_server.get_http_headers",
- return_value={"authorization": "Bearer test-auth-token"},
- ):
- result = await middleware.on_request(mock_ctx, mock_call_next)
- assert result == "success"
- mock_call_next.assert_called_once()
- # Test on_read_resource method with correct auth
- mock_call_next.reset_mock()
- result = await middleware.on_read_resource(mock_ctx, mock_call_next)
- assert result == "success"
- mock_call_next.assert_called_once()
- # Test with incorrect authorization (should raise McpError)
- mock_call_next.reset_mock()
- with patch(
- "agency_swarm.integrations.mcp_server.get_http_headers",
- return_value={"authorization": "Bearer wrong-token"},
- ):
- from fastmcp.exceptions import McpError
- with pytest.raises(McpError):
- await middleware.on_request(mock_ctx, mock_call_next)
- with pytest.raises(McpError):
- await middleware.on_read_resource(mock_ctx, mock_call_next)
- finally:
- del os.environ["TEST_AUTH_TOKEN"]
|