| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556 |
- """Integration tests for Agency shared resources (shared_tools, shared_files_folder, shared_mcp_servers)."""
- import os
- import sys
- import time
- from pathlib import Path
- from unittest.mock import patch
- import pytest
- from agents import function_tool
- from agents.mcp.server import MCPServerStdio
- from pydantic import Field
- from agency_swarm import Agency, Agent
- from agency_swarm.tools import BaseTool
- @pytest.fixture
- def temp_tools_folder(tmp_path: Path) -> Path:
- """Create a temporary folder with a sample tool file."""
- tools_folder = tmp_path / "shared_tools"
- tools_folder.mkdir()
- tool_file = tools_folder / "SampleTool.py"
- tool_file.write_text('''
- from agency_swarm.tools import BaseTool
- from pydantic import Field
- class SampleTool(BaseTool):
- """A sample tool for testing shared tools folder."""
- message: str = Field(..., description="Message to echo")
- def run(self) -> str:
- return f"Echo: {self.message}"
- ''')
- return tools_folder
- @pytest.fixture
- def temp_files_folder(tmp_path: Path) -> Path:
- """Create a temporary folder with sample files for shared files."""
- files_folder = tmp_path / "shared_files"
- files_folder.mkdir()
- # Create sample files for file_search testing
- (files_folder / "sample.txt").write_text("This is a sample file for testing shared files.")
- (files_folder / "data.md").write_text("# Sample Markdown\n\nSome content here.")
- (files_folder / "secrets.txt").write_text("The secret code is ALPHA-BRAVO-CHARLIE-123.")
- return files_folder
- @pytest.fixture
- def basic_agents() -> tuple[Agent, Agent]:
- """Create two basic agents for testing."""
- agent_a = Agent(
- name="AgentA",
- instructions="You are Agent A. Use file_search when asked about documents.",
- model="gpt-5.4-mini",
- )
- agent_b = Agent(
- name="AgentB",
- instructions="You are Agent B. Use file_search when asked about documents.",
- model="gpt-5.4-mini",
- )
- return agent_a, agent_b
- def _create_fresh_agents() -> tuple[Agent, Agent]:
- """Create fresh agent instances (not cached by fixture)."""
- agent_a = Agent(
- name="AgentA",
- instructions="You are Agent A. Use file_search when asked about documents.",
- model="gpt-5.4-mini",
- )
- agent_b = Agent(
- name="AgentB",
- instructions="You are Agent B. Use file_search when asked about documents.",
- model="gpt-5.4-mini",
- )
- return agent_a, agent_b
- class TestSharedTools:
- """Tests for shared_tools parameter."""
- def test_shared_function_tool_added_to_all_agents(self, basic_agents: tuple[Agent, Agent]):
- """Shared FunctionTool should be added to all agents."""
- agent_a, agent_b = basic_agents
- @function_tool
- def shared_search(query: str) -> str:
- """Search for information."""
- return f"Results for: {query}"
- agency = Agency(
- agent_a,
- shared_tools=[shared_search],
- communication_flows=[(agent_a, agent_b)],
- )
- # Both agents should have the tool
- agent_a_tool_names = [getattr(t, "name", None) for t in agency.agents["AgentA"].tools]
- agent_b_tool_names = [getattr(t, "name", None) for t in agency.agents["AgentB"].tools]
- assert "shared_search" in agent_a_tool_names
- assert "shared_search" in agent_b_tool_names
- def test_shared_function_tool_instances_are_independent(self, basic_agents: tuple[Agent, Agent]):
- """Each agent should get its own copy of FunctionTool to avoid shared guard state."""
- agent_a, agent_b = basic_agents
- @function_tool
- def shared_tool(x: str) -> str:
- """A shared tool."""
- return x
- original_id = id(shared_tool)
- agency = Agency(
- agent_a,
- shared_tools=[shared_tool],
- communication_flows=[(agent_a, agent_b)],
- )
- # Get the tools from each agent
- agent_a_tools = [t for t in agency.agents["AgentA"].tools if getattr(t, "name", None) == "shared_tool"]
- agent_b_tools = [t for t in agency.agents["AgentB"].tools if getattr(t, "name", None) == "shared_tool"]
- assert len(agent_a_tools) == 1
- assert len(agent_b_tools) == 1
- # They should be different instances (copied)
- assert id(agent_a_tools[0]) != id(agent_b_tools[0])
- # And different from the original
- assert id(agent_a_tools[0]) != original_id
- assert id(agent_b_tools[0]) != original_id
- def test_shared_base_tool_class_adapted_for_each_agent(self, basic_agents: tuple[Agent, Agent]):
- """BaseTool classes should be adapted to FunctionTool for each agent independently."""
- agent_a, agent_b = basic_agents
- class SharedBaseTool(BaseTool):
- """A shared BaseTool class."""
- param: str = Field(..., description="A parameter")
- def run(self) -> str:
- return f"Result: {self.param}"
- agency = Agency(
- agent_a,
- shared_tools=[SharedBaseTool],
- communication_flows=[(agent_a, agent_b)],
- )
- # Both agents should have the tool
- agent_a_tool_names = [getattr(t, "name", None) for t in agency.agents["AgentA"].tools]
- agent_b_tool_names = [getattr(t, "name", None) for t in agency.agents["AgentB"].tools]
- assert "SharedBaseTool" in agent_a_tool_names
- assert "SharedBaseTool" in agent_b_tool_names
- class TestSharedToolsFolder:
- """Tests for shared_tools_folder parameter."""
- def test_tools_loaded_from_folder(self, basic_agents: tuple[Agent, Agent], temp_tools_folder: Path):
- """Tools should be loaded from the shared_tools_folder and added to all agents."""
- agent_a, agent_b = basic_agents
- agency = Agency(
- agent_a,
- shared_tools_folder=str(temp_tools_folder),
- communication_flows=[(agent_a, agent_b)],
- )
- # Both agents should have the SampleTool
- agent_a_tool_names = [getattr(t, "name", None) for t in agency.agents["AgentA"].tools]
- agent_b_tool_names = [getattr(t, "name", None) for t in agency.agents["AgentB"].tools]
- assert "SampleTool" in agent_a_tool_names
- assert "SampleTool" in agent_b_tool_names
- class TestSharedFilesFolder:
- """Tests for shared_files_folder parameter."""
- def test_shared_files_skipped_in_dry_run(self, basic_agents: tuple[Agent, Agent], temp_files_folder: Path):
- """Shared files processing should be skipped when DRY_RUN is enabled."""
- agent_a, agent_b = basic_agents
- with patch.dict(os.environ, {"DRY_RUN": "1"}):
- _agency = Agency(
- agent_a,
- shared_files_folder=str(temp_files_folder),
- communication_flows=[(agent_a, agent_b)],
- )
- # Folder should not be renamed in dry run
- assert temp_files_folder.exists()
- # No VS-suffixed folder should exist
- vs_folders = list(temp_files_folder.parent.glob("shared_files_vs_*"))
- assert len(vs_folders) == 0
- class TestSharedMCPServers:
- """Tests for shared_mcp_servers parameter."""
- def test_shared_mcp_servers_added_to_all_agents(self, basic_agents: tuple[Agent, Agent]):
- """Shared MCP servers should be converted into tools for all agents."""
- agent_a, agent_b = basic_agents
- stdio_server_path = str(Path(__file__).parents[2] / "data" / "scripts" / "stdio_server.py")
- server_name = f"shared_mcp_stdio_{os.getpid()}"
- stdio_server = MCPServerStdio(
- name=server_name,
- params={
- "command": sys.executable,
- "args": [stdio_server_path],
- },
- client_session_timeout_seconds=15,
- )
- # shared_mcp_servers are attached during Agency init and converted into tools.
- agency = Agency(
- agent_a,
- shared_mcp_servers=[stdio_server],
- communication_flows=[(agent_a, agent_b)],
- )
- agent_a_tool_names = [getattr(t, "name", None) for t in agency.agents["AgentA"].tools]
- agent_b_tool_names = [getattr(t, "name", None) for t in agency.agents["AgentB"].tools]
- assert "greet" in agent_a_tool_names
- assert "add" in agent_a_tool_names
- assert "greet" in agent_b_tool_names
- assert "add" in agent_b_tool_names
- # Conversion clears the server list after creating tools
- assert agency.agents["AgentA"].mcp_servers == []
- assert agency.agents["AgentB"].mcp_servers == []
- class TestFastAPIFactoryPassesSharedParams:
- """Tests that agency_factory in FastAPI helpers passes shared parameters."""
- def test_agency_factory_includes_shared_params(self, basic_agents: tuple[Agent, Agent], temp_tools_folder: Path):
- """The agency_factory closure should include all shared parameters."""
- agent_a, agent_b = basic_agents
- @function_tool
- def shared_tool(x: str) -> str:
- """Shared tool."""
- return x
- with patch.dict(os.environ, {"DRY_RUN": "1"}):
- agency = Agency(
- agent_a,
- shared_tools=[shared_tool],
- shared_tools_folder=str(temp_tools_folder),
- shared_files_folder=str(temp_tools_folder.parent / "nonexistent"), # Won't be processed in dry run
- communication_flows=[(agent_a, agent_b)],
- )
- # Verify the agency has the shared params stored
- assert agency.shared_tools == [shared_tool]
- assert agency.shared_tools_folder == str(temp_tools_folder)
- assert agency.shared_files_folder == str(temp_tools_folder.parent / "nonexistent")
- class TestSharedResourcesIntegration:
- """Integration tests combining multiple shared resource types."""
- def test_multiple_shared_resources_together(self, basic_agents: tuple[Agent, Agent], temp_tools_folder: Path):
- """Multiple shared resource types should work together."""
- agent_a, agent_b = basic_agents
- @function_tool
- def inline_shared_tool(x: str) -> str:
- """An inline shared tool."""
- return x
- with patch.dict(os.environ, {"DRY_RUN": "1"}):
- agency = Agency(
- agent_a,
- shared_tools=[inline_shared_tool],
- shared_tools_folder=str(temp_tools_folder),
- communication_flows=[(agent_a, agent_b)],
- )
- # Both agents should have both tools
- agent_a_tool_names = [getattr(t, "name", None) for t in agency.agents["AgentA"].tools]
- agent_b_tool_names = [getattr(t, "name", None) for t in agency.agents["AgentB"].tools]
- # Inline shared tool
- assert "inline_shared_tool" in agent_a_tool_names
- assert "inline_shared_tool" in agent_b_tool_names
- # Tool from folder
- assert "SampleTool" in agent_a_tool_names
- assert "SampleTool" in agent_b_tool_names
- class TestSharedFilesFolderLive:
- """Live integration tests for shared_files_folder (requires OpenAI API)."""
- def test_shared_files_folder_adds_file_search_to_all_agents(self, temp_files_folder: Path):
- """Shared files folder should add FileSearchTool to all agents."""
- agent_a, agent_b = _create_fresh_agents()
- agency = Agency(
- agent_a,
- shared_files_folder=str(temp_files_folder),
- communication_flows=[(agent_a, agent_b)],
- )
- # Both agents should have file_search tool
- for agent_name in ["AgentA", "AgentB"]:
- tool_names = {getattr(t, "name", None) for t in agency.agents[agent_name].tools}
- assert "file_search" in tool_names, f"{agent_name} should have file_search tool"
- def test_shared_files_folder_same_vector_store_for_all_agents(self, temp_files_folder: Path):
- """All agents should share the same vector store ID."""
- agent_a, agent_b = _create_fresh_agents()
- agency = Agency(
- agent_a,
- shared_files_folder=str(temp_files_folder),
- communication_flows=[(agent_a, agent_b)],
- )
- # Get vector store IDs from FileSearchTool of each agent
- vs_ids = set()
- for agent_name in ["AgentA", "AgentB"]:
- for tool in agency.agents[agent_name].tools:
- if getattr(tool, "name", None) == "file_search":
- # FileSearchTool has vector_store_ids attribute
- if hasattr(tool, "vector_store_ids"):
- vs_ids.update(tool.vector_store_ids)
- break
- # All agents should use the same vector store
- assert len(vs_ids) == 1, f"Expected 1 shared vector store, got {len(vs_ids)}: {vs_ids}"
- def test_shared_files_folder_renamed_with_vs_suffix(self, temp_files_folder: Path):
- """Folder should be renamed to include vector store ID suffix."""
- agent_a, agent_b = _create_fresh_agents()
- original_name = temp_files_folder.name # "shared_files"
- Agency(
- agent_a,
- shared_files_folder=str(temp_files_folder),
- communication_flows=[(agent_a, agent_b)],
- )
- # Original folder should be renamed
- assert not temp_files_folder.exists(), "Original folder should be renamed"
- # Should have a _vs_xxx suffixed folder
- vs_folders = list(temp_files_folder.parent.glob(f"{original_name}_vs_*"))
- assert len(vs_folders) == 1, f"Expected 1 renamed folder, got {len(vs_folders)}"
- def test_shared_files_folder_hot_reload_reuses_vector_store(self, temp_files_folder: Path):
- """Second Agency init with same folder should reuse existing vector store."""
- original_name = temp_files_folder.name
- # First init
- agent_a1, agent_b1 = _create_fresh_agents()
- _agency1 = Agency(
- agent_a1,
- shared_files_folder=str(temp_files_folder),
- communication_flows=[(agent_a1, agent_b1)],
- )
- # Find the renamed folder
- vs_folders = list(temp_files_folder.parent.glob(f"{original_name}_vs_*"))
- assert len(vs_folders) == 1
- renamed_folder = vs_folders[0]
- # Extract VS ID from folder name
- first_vs_id = renamed_folder.name.split("_vs_")[1]
- # Second init with original path (simulates hot reload)
- agent_a2, agent_b2 = _create_fresh_agents()
- _agency2 = Agency(
- agent_a2,
- shared_files_folder=str(temp_files_folder), # Original path
- communication_flows=[(agent_a2, agent_b2)],
- )
- # Should still be the same folder (no new folders created)
- vs_folders_after = list(temp_files_folder.parent.glob(f"{original_name}_vs_*"))
- assert len(vs_folders_after) == 1, f"Should still have 1 folder, got {len(vs_folders_after)}"
- # Extract VS ID and verify it's the same
- second_vs_id = vs_folders_after[0].name.split("_vs_")[1]
- assert first_vs_id == second_vs_id, "Vector store ID should be reused"
- @pytest.mark.skipif(
- os.getenv("CI") == "true",
- reason="Requires live OpenAI vector-store ingestion; skipped on CI due to upstream flake.",
- )
- def test_shared_files_folder_hot_reload_uploads_new_files(self, temp_files_folder: Path):
- """New files placed into the original folder path on hot reload should be uploaded."""
- original_name = temp_files_folder.name
- # First init creates VS folder by renaming original.
- agent_a1, agent_b1 = _create_fresh_agents()
- agency1 = Agency(
- agent_a1,
- shared_files_folder=str(temp_files_folder),
- communication_flows=[(agent_a1, agent_b1)],
- )
- # Identify the VS folder and VS id
- vs_folders = list(temp_files_folder.parent.glob(f"{original_name}_vs_*"))
- assert len(vs_folders) == 1
- vs_ids_1 = set()
- for tool in agency1.agents["AgentA"].tools:
- if getattr(tool, "name", None) == "file_search" and hasattr(tool, "vector_store_ids"):
- vs_ids_1.update(tool.vector_store_ids)
- break
- assert len(vs_ids_1) == 1
- first_vs_id = next(iter(vs_ids_1))
- # Re-create the original folder path and add a brand new file.
- # This simulates a common "hot reload" workflow where the user keeps writing into the original folder name.
- temp_files_folder.mkdir(exist_ok=True)
- new_filename = "new_hot_reload.txt"
- (temp_files_folder / new_filename).write_text("hot reload new content")
- # Second init should reuse the existing VS and upload the new file from the original folder.
- agent_a2, agent_b2 = _create_fresh_agents()
- agency2 = Agency(
- agent_a2,
- shared_files_folder=str(temp_files_folder),
- communication_flows=[(agent_a2, agent_b2)],
- )
- # Confirm we're still using the same VS id
- vs_ids = set()
- for tool in agency2.agents["AgentA"].tools:
- if getattr(tool, "name", None) == "file_search" and hasattr(tool, "vector_store_ids"):
- vs_ids.update(tool.vector_store_ids)
- break
- assert vs_ids == {first_vs_id}
- # Confirm the new file is in the vector store by filename
- deadline = time.monotonic() + 30
- filenames: list[str] = []
- while time.monotonic() < deadline:
- vs_files = agency2.agents["AgentA"].client_sync.vector_stores.files.list(vector_store_id=first_vs_id)
- file_ids = [vf.id for vf in vs_files.data]
- filenames = [agency2.agents["AgentA"].client_sync.files.retrieve(fid).filename for fid in file_ids]
- if new_filename in filenames:
- break
- time.sleep(1)
- assert new_filename in filenames
- @pytest.mark.asyncio
- @pytest.mark.skipif(
- os.getenv("CI") == "true",
- reason="Requires live OpenAI vector-store ingestion; skipped on CI due to upstream flake.",
- )
- async def test_shared_files_folder_file_search_works(self, temp_files_folder: Path):
- """Agent should be able to search shared files and find content."""
- agent_a, agent_b = _create_fresh_agents()
- agency = Agency(
- agent_a,
- shared_files_folder=str(temp_files_folder),
- communication_flows=[(agent_a, agent_b)],
- )
- vs_id = agent_a._associated_vector_store_id
- assert vs_id is not None
- deadline = time.monotonic() + 90
- filenames: list[str] = []
- while time.monotonic() < deadline:
- vs_files = agent_a.client_sync.vector_stores.files.list(vector_store_id=vs_id)
- file_ids = [vf.id for vf in vs_files.data]
- filenames = [agent_a.client_sync.files.retrieve(fid).filename for fid in file_ids]
- if "secrets.txt" in filenames:
- break
- time.sleep(1)
- assert "secrets.txt" in filenames
- agent_a.file_manager._sync.wait_for_vector_store_files_ready(
- [(vs_id, file_id) for file_id in file_ids],
- timeout_seconds=180.0,
- )
- # Ask about content in the files
- response = await agency.get_response("What is the secret code? Search the files to find it.")
- # The response should contain the secret from secrets.txt
- assert "ALPHA" in response.final_output or "BRAVO" in response.final_output, (
- f"Expected secret code in response, got: {response.final_output}"
- )
- class TestSharedToolsEdgeCases:
- """Edge case tests for shared tools."""
- def test_empty_shared_tools_list(self, basic_agents: tuple[Agent, Agent]):
- """Empty shared_tools list should not cause errors."""
- agent_a, agent_b = basic_agents
- agency = Agency(
- agent_a,
- shared_tools=[],
- communication_flows=[(agent_a, agent_b)],
- )
- # Agency should initialize without errors
- assert agency is not None
- def test_shared_tools_with_duplicate_names_skipped(self, basic_agents: tuple[Agent, Agent]):
- """Duplicate tool names should be skipped with warning."""
- agent_a, agent_b = basic_agents
- @function_tool
- def duplicate_tool(x: str) -> str:
- """A tool that will be duplicated."""
- return x
- # Add the same tool twice
- agency = Agency(
- agent_a,
- shared_tools=[duplicate_tool, duplicate_tool],
- communication_flows=[(agent_a, agent_b)],
- )
- # Should only have one instance of the tool per agent
- for agent_name in ["AgentA", "AgentB"]:
- tool_names = [getattr(t, "name", None) for t in agency.agents[agent_name].tools]
- count = tool_names.count("duplicate_tool")
- assert count == 1, f"{agent_name} should have exactly 1 duplicate_tool, got {count}"
- def test_nonexistent_shared_tools_folder_handled(self, basic_agents: tuple[Agent, Agent], tmp_path: Path):
- """Nonexistent shared_tools_folder should be handled gracefully."""
- agent_a, agent_b = basic_agents
- nonexistent = tmp_path / "does_not_exist"
- # Should not raise, just warn
- agency = Agency(
- agent_a,
- shared_tools_folder=str(nonexistent),
- communication_flows=[(agent_a, agent_b)],
- )
- assert agency is not None
|