test_shared_resources.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556
  1. """Integration tests for Agency shared resources (shared_tools, shared_files_folder, shared_mcp_servers)."""
  2. import os
  3. import sys
  4. import time
  5. from pathlib import Path
  6. from unittest.mock import patch
  7. import pytest
  8. from agents import function_tool
  9. from agents.mcp.server import MCPServerStdio
  10. from pydantic import Field
  11. from agency_swarm import Agency, Agent
  12. from agency_swarm.tools import BaseTool
  13. @pytest.fixture
  14. def temp_tools_folder(tmp_path: Path) -> Path:
  15. """Create a temporary folder with a sample tool file."""
  16. tools_folder = tmp_path / "shared_tools"
  17. tools_folder.mkdir()
  18. tool_file = tools_folder / "SampleTool.py"
  19. tool_file.write_text('''
  20. from agency_swarm.tools import BaseTool
  21. from pydantic import Field
  22. class SampleTool(BaseTool):
  23. """A sample tool for testing shared tools folder."""
  24. message: str = Field(..., description="Message to echo")
  25. def run(self) -> str:
  26. return f"Echo: {self.message}"
  27. ''')
  28. return tools_folder
  29. @pytest.fixture
  30. def temp_files_folder(tmp_path: Path) -> Path:
  31. """Create a temporary folder with sample files for shared files."""
  32. files_folder = tmp_path / "shared_files"
  33. files_folder.mkdir()
  34. # Create sample files for file_search testing
  35. (files_folder / "sample.txt").write_text("This is a sample file for testing shared files.")
  36. (files_folder / "data.md").write_text("# Sample Markdown\n\nSome content here.")
  37. (files_folder / "secrets.txt").write_text("The secret code is ALPHA-BRAVO-CHARLIE-123.")
  38. return files_folder
  39. @pytest.fixture
  40. def basic_agents() -> tuple[Agent, Agent]:
  41. """Create two basic agents for testing."""
  42. agent_a = Agent(
  43. name="AgentA",
  44. instructions="You are Agent A. Use file_search when asked about documents.",
  45. model="gpt-5.4-mini",
  46. )
  47. agent_b = Agent(
  48. name="AgentB",
  49. instructions="You are Agent B. Use file_search when asked about documents.",
  50. model="gpt-5.4-mini",
  51. )
  52. return agent_a, agent_b
  53. def _create_fresh_agents() -> tuple[Agent, Agent]:
  54. """Create fresh agent instances (not cached by fixture)."""
  55. agent_a = Agent(
  56. name="AgentA",
  57. instructions="You are Agent A. Use file_search when asked about documents.",
  58. model="gpt-5.4-mini",
  59. )
  60. agent_b = Agent(
  61. name="AgentB",
  62. instructions="You are Agent B. Use file_search when asked about documents.",
  63. model="gpt-5.4-mini",
  64. )
  65. return agent_a, agent_b
  66. class TestSharedTools:
  67. """Tests for shared_tools parameter."""
  68. def test_shared_function_tool_added_to_all_agents(self, basic_agents: tuple[Agent, Agent]):
  69. """Shared FunctionTool should be added to all agents."""
  70. agent_a, agent_b = basic_agents
  71. @function_tool
  72. def shared_search(query: str) -> str:
  73. """Search for information."""
  74. return f"Results for: {query}"
  75. agency = Agency(
  76. agent_a,
  77. shared_tools=[shared_search],
  78. communication_flows=[(agent_a, agent_b)],
  79. )
  80. # Both agents should have the tool
  81. agent_a_tool_names = [getattr(t, "name", None) for t in agency.agents["AgentA"].tools]
  82. agent_b_tool_names = [getattr(t, "name", None) for t in agency.agents["AgentB"].tools]
  83. assert "shared_search" in agent_a_tool_names
  84. assert "shared_search" in agent_b_tool_names
  85. def test_shared_function_tool_instances_are_independent(self, basic_agents: tuple[Agent, Agent]):
  86. """Each agent should get its own copy of FunctionTool to avoid shared guard state."""
  87. agent_a, agent_b = basic_agents
  88. @function_tool
  89. def shared_tool(x: str) -> str:
  90. """A shared tool."""
  91. return x
  92. original_id = id(shared_tool)
  93. agency = Agency(
  94. agent_a,
  95. shared_tools=[shared_tool],
  96. communication_flows=[(agent_a, agent_b)],
  97. )
  98. # Get the tools from each agent
  99. agent_a_tools = [t for t in agency.agents["AgentA"].tools if getattr(t, "name", None) == "shared_tool"]
  100. agent_b_tools = [t for t in agency.agents["AgentB"].tools if getattr(t, "name", None) == "shared_tool"]
  101. assert len(agent_a_tools) == 1
  102. assert len(agent_b_tools) == 1
  103. # They should be different instances (copied)
  104. assert id(agent_a_tools[0]) != id(agent_b_tools[0])
  105. # And different from the original
  106. assert id(agent_a_tools[0]) != original_id
  107. assert id(agent_b_tools[0]) != original_id
  108. def test_shared_base_tool_class_adapted_for_each_agent(self, basic_agents: tuple[Agent, Agent]):
  109. """BaseTool classes should be adapted to FunctionTool for each agent independently."""
  110. agent_a, agent_b = basic_agents
  111. class SharedBaseTool(BaseTool):
  112. """A shared BaseTool class."""
  113. param: str = Field(..., description="A parameter")
  114. def run(self) -> str:
  115. return f"Result: {self.param}"
  116. agency = Agency(
  117. agent_a,
  118. shared_tools=[SharedBaseTool],
  119. communication_flows=[(agent_a, agent_b)],
  120. )
  121. # Both agents should have the tool
  122. agent_a_tool_names = [getattr(t, "name", None) for t in agency.agents["AgentA"].tools]
  123. agent_b_tool_names = [getattr(t, "name", None) for t in agency.agents["AgentB"].tools]
  124. assert "SharedBaseTool" in agent_a_tool_names
  125. assert "SharedBaseTool" in agent_b_tool_names
  126. class TestSharedToolsFolder:
  127. """Tests for shared_tools_folder parameter."""
  128. def test_tools_loaded_from_folder(self, basic_agents: tuple[Agent, Agent], temp_tools_folder: Path):
  129. """Tools should be loaded from the shared_tools_folder and added to all agents."""
  130. agent_a, agent_b = basic_agents
  131. agency = Agency(
  132. agent_a,
  133. shared_tools_folder=str(temp_tools_folder),
  134. communication_flows=[(agent_a, agent_b)],
  135. )
  136. # Both agents should have the SampleTool
  137. agent_a_tool_names = [getattr(t, "name", None) for t in agency.agents["AgentA"].tools]
  138. agent_b_tool_names = [getattr(t, "name", None) for t in agency.agents["AgentB"].tools]
  139. assert "SampleTool" in agent_a_tool_names
  140. assert "SampleTool" in agent_b_tool_names
  141. class TestSharedFilesFolder:
  142. """Tests for shared_files_folder parameter."""
  143. def test_shared_files_skipped_in_dry_run(self, basic_agents: tuple[Agent, Agent], temp_files_folder: Path):
  144. """Shared files processing should be skipped when DRY_RUN is enabled."""
  145. agent_a, agent_b = basic_agents
  146. with patch.dict(os.environ, {"DRY_RUN": "1"}):
  147. _agency = Agency(
  148. agent_a,
  149. shared_files_folder=str(temp_files_folder),
  150. communication_flows=[(agent_a, agent_b)],
  151. )
  152. # Folder should not be renamed in dry run
  153. assert temp_files_folder.exists()
  154. # No VS-suffixed folder should exist
  155. vs_folders = list(temp_files_folder.parent.glob("shared_files_vs_*"))
  156. assert len(vs_folders) == 0
  157. class TestSharedMCPServers:
  158. """Tests for shared_mcp_servers parameter."""
  159. def test_shared_mcp_servers_added_to_all_agents(self, basic_agents: tuple[Agent, Agent]):
  160. """Shared MCP servers should be converted into tools for all agents."""
  161. agent_a, agent_b = basic_agents
  162. stdio_server_path = str(Path(__file__).parents[2] / "data" / "scripts" / "stdio_server.py")
  163. server_name = f"shared_mcp_stdio_{os.getpid()}"
  164. stdio_server = MCPServerStdio(
  165. name=server_name,
  166. params={
  167. "command": sys.executable,
  168. "args": [stdio_server_path],
  169. },
  170. client_session_timeout_seconds=15,
  171. )
  172. # shared_mcp_servers are attached during Agency init and converted into tools.
  173. agency = Agency(
  174. agent_a,
  175. shared_mcp_servers=[stdio_server],
  176. communication_flows=[(agent_a, agent_b)],
  177. )
  178. agent_a_tool_names = [getattr(t, "name", None) for t in agency.agents["AgentA"].tools]
  179. agent_b_tool_names = [getattr(t, "name", None) for t in agency.agents["AgentB"].tools]
  180. assert "greet" in agent_a_tool_names
  181. assert "add" in agent_a_tool_names
  182. assert "greet" in agent_b_tool_names
  183. assert "add" in agent_b_tool_names
  184. # Conversion clears the server list after creating tools
  185. assert agency.agents["AgentA"].mcp_servers == []
  186. assert agency.agents["AgentB"].mcp_servers == []
  187. class TestFastAPIFactoryPassesSharedParams:
  188. """Tests that agency_factory in FastAPI helpers passes shared parameters."""
  189. def test_agency_factory_includes_shared_params(self, basic_agents: tuple[Agent, Agent], temp_tools_folder: Path):
  190. """The agency_factory closure should include all shared parameters."""
  191. agent_a, agent_b = basic_agents
  192. @function_tool
  193. def shared_tool(x: str) -> str:
  194. """Shared tool."""
  195. return x
  196. with patch.dict(os.environ, {"DRY_RUN": "1"}):
  197. agency = Agency(
  198. agent_a,
  199. shared_tools=[shared_tool],
  200. shared_tools_folder=str(temp_tools_folder),
  201. shared_files_folder=str(temp_tools_folder.parent / "nonexistent"), # Won't be processed in dry run
  202. communication_flows=[(agent_a, agent_b)],
  203. )
  204. # Verify the agency has the shared params stored
  205. assert agency.shared_tools == [shared_tool]
  206. assert agency.shared_tools_folder == str(temp_tools_folder)
  207. assert agency.shared_files_folder == str(temp_tools_folder.parent / "nonexistent")
  208. class TestSharedResourcesIntegration:
  209. """Integration tests combining multiple shared resource types."""
  210. def test_multiple_shared_resources_together(self, basic_agents: tuple[Agent, Agent], temp_tools_folder: Path):
  211. """Multiple shared resource types should work together."""
  212. agent_a, agent_b = basic_agents
  213. @function_tool
  214. def inline_shared_tool(x: str) -> str:
  215. """An inline shared tool."""
  216. return x
  217. with patch.dict(os.environ, {"DRY_RUN": "1"}):
  218. agency = Agency(
  219. agent_a,
  220. shared_tools=[inline_shared_tool],
  221. shared_tools_folder=str(temp_tools_folder),
  222. communication_flows=[(agent_a, agent_b)],
  223. )
  224. # Both agents should have both tools
  225. agent_a_tool_names = [getattr(t, "name", None) for t in agency.agents["AgentA"].tools]
  226. agent_b_tool_names = [getattr(t, "name", None) for t in agency.agents["AgentB"].tools]
  227. # Inline shared tool
  228. assert "inline_shared_tool" in agent_a_tool_names
  229. assert "inline_shared_tool" in agent_b_tool_names
  230. # Tool from folder
  231. assert "SampleTool" in agent_a_tool_names
  232. assert "SampleTool" in agent_b_tool_names
  233. class TestSharedFilesFolderLive:
  234. """Live integration tests for shared_files_folder (requires OpenAI API)."""
  235. def test_shared_files_folder_adds_file_search_to_all_agents(self, temp_files_folder: Path):
  236. """Shared files folder should add FileSearchTool to all agents."""
  237. agent_a, agent_b = _create_fresh_agents()
  238. agency = Agency(
  239. agent_a,
  240. shared_files_folder=str(temp_files_folder),
  241. communication_flows=[(agent_a, agent_b)],
  242. )
  243. # Both agents should have file_search tool
  244. for agent_name in ["AgentA", "AgentB"]:
  245. tool_names = {getattr(t, "name", None) for t in agency.agents[agent_name].tools}
  246. assert "file_search" in tool_names, f"{agent_name} should have file_search tool"
  247. def test_shared_files_folder_same_vector_store_for_all_agents(self, temp_files_folder: Path):
  248. """All agents should share the same vector store ID."""
  249. agent_a, agent_b = _create_fresh_agents()
  250. agency = Agency(
  251. agent_a,
  252. shared_files_folder=str(temp_files_folder),
  253. communication_flows=[(agent_a, agent_b)],
  254. )
  255. # Get vector store IDs from FileSearchTool of each agent
  256. vs_ids = set()
  257. for agent_name in ["AgentA", "AgentB"]:
  258. for tool in agency.agents[agent_name].tools:
  259. if getattr(tool, "name", None) == "file_search":
  260. # FileSearchTool has vector_store_ids attribute
  261. if hasattr(tool, "vector_store_ids"):
  262. vs_ids.update(tool.vector_store_ids)
  263. break
  264. # All agents should use the same vector store
  265. assert len(vs_ids) == 1, f"Expected 1 shared vector store, got {len(vs_ids)}: {vs_ids}"
  266. def test_shared_files_folder_renamed_with_vs_suffix(self, temp_files_folder: Path):
  267. """Folder should be renamed to include vector store ID suffix."""
  268. agent_a, agent_b = _create_fresh_agents()
  269. original_name = temp_files_folder.name # "shared_files"
  270. Agency(
  271. agent_a,
  272. shared_files_folder=str(temp_files_folder),
  273. communication_flows=[(agent_a, agent_b)],
  274. )
  275. # Original folder should be renamed
  276. assert not temp_files_folder.exists(), "Original folder should be renamed"
  277. # Should have a _vs_xxx suffixed folder
  278. vs_folders = list(temp_files_folder.parent.glob(f"{original_name}_vs_*"))
  279. assert len(vs_folders) == 1, f"Expected 1 renamed folder, got {len(vs_folders)}"
  280. def test_shared_files_folder_hot_reload_reuses_vector_store(self, temp_files_folder: Path):
  281. """Second Agency init with same folder should reuse existing vector store."""
  282. original_name = temp_files_folder.name
  283. # First init
  284. agent_a1, agent_b1 = _create_fresh_agents()
  285. _agency1 = Agency(
  286. agent_a1,
  287. shared_files_folder=str(temp_files_folder),
  288. communication_flows=[(agent_a1, agent_b1)],
  289. )
  290. # Find the renamed folder
  291. vs_folders = list(temp_files_folder.parent.glob(f"{original_name}_vs_*"))
  292. assert len(vs_folders) == 1
  293. renamed_folder = vs_folders[0]
  294. # Extract VS ID from folder name
  295. first_vs_id = renamed_folder.name.split("_vs_")[1]
  296. # Second init with original path (simulates hot reload)
  297. agent_a2, agent_b2 = _create_fresh_agents()
  298. _agency2 = Agency(
  299. agent_a2,
  300. shared_files_folder=str(temp_files_folder), # Original path
  301. communication_flows=[(agent_a2, agent_b2)],
  302. )
  303. # Should still be the same folder (no new folders created)
  304. vs_folders_after = list(temp_files_folder.parent.glob(f"{original_name}_vs_*"))
  305. assert len(vs_folders_after) == 1, f"Should still have 1 folder, got {len(vs_folders_after)}"
  306. # Extract VS ID and verify it's the same
  307. second_vs_id = vs_folders_after[0].name.split("_vs_")[1]
  308. assert first_vs_id == second_vs_id, "Vector store ID should be reused"
  309. @pytest.mark.skipif(
  310. os.getenv("CI") == "true",
  311. reason="Requires live OpenAI vector-store ingestion; skipped on CI due to upstream flake.",
  312. )
  313. def test_shared_files_folder_hot_reload_uploads_new_files(self, temp_files_folder: Path):
  314. """New files placed into the original folder path on hot reload should be uploaded."""
  315. original_name = temp_files_folder.name
  316. # First init creates VS folder by renaming original.
  317. agent_a1, agent_b1 = _create_fresh_agents()
  318. agency1 = Agency(
  319. agent_a1,
  320. shared_files_folder=str(temp_files_folder),
  321. communication_flows=[(agent_a1, agent_b1)],
  322. )
  323. # Identify the VS folder and VS id
  324. vs_folders = list(temp_files_folder.parent.glob(f"{original_name}_vs_*"))
  325. assert len(vs_folders) == 1
  326. vs_ids_1 = set()
  327. for tool in agency1.agents["AgentA"].tools:
  328. if getattr(tool, "name", None) == "file_search" and hasattr(tool, "vector_store_ids"):
  329. vs_ids_1.update(tool.vector_store_ids)
  330. break
  331. assert len(vs_ids_1) == 1
  332. first_vs_id = next(iter(vs_ids_1))
  333. # Re-create the original folder path and add a brand new file.
  334. # This simulates a common "hot reload" workflow where the user keeps writing into the original folder name.
  335. temp_files_folder.mkdir(exist_ok=True)
  336. new_filename = "new_hot_reload.txt"
  337. (temp_files_folder / new_filename).write_text("hot reload new content")
  338. # Second init should reuse the existing VS and upload the new file from the original folder.
  339. agent_a2, agent_b2 = _create_fresh_agents()
  340. agency2 = Agency(
  341. agent_a2,
  342. shared_files_folder=str(temp_files_folder),
  343. communication_flows=[(agent_a2, agent_b2)],
  344. )
  345. # Confirm we're still using the same VS id
  346. vs_ids = set()
  347. for tool in agency2.agents["AgentA"].tools:
  348. if getattr(tool, "name", None) == "file_search" and hasattr(tool, "vector_store_ids"):
  349. vs_ids.update(tool.vector_store_ids)
  350. break
  351. assert vs_ids == {first_vs_id}
  352. # Confirm the new file is in the vector store by filename
  353. deadline = time.monotonic() + 30
  354. filenames: list[str] = []
  355. while time.monotonic() < deadline:
  356. vs_files = agency2.agents["AgentA"].client_sync.vector_stores.files.list(vector_store_id=first_vs_id)
  357. file_ids = [vf.id for vf in vs_files.data]
  358. filenames = [agency2.agents["AgentA"].client_sync.files.retrieve(fid).filename for fid in file_ids]
  359. if new_filename in filenames:
  360. break
  361. time.sleep(1)
  362. assert new_filename in filenames
  363. @pytest.mark.asyncio
  364. @pytest.mark.skipif(
  365. os.getenv("CI") == "true",
  366. reason="Requires live OpenAI vector-store ingestion; skipped on CI due to upstream flake.",
  367. )
  368. async def test_shared_files_folder_file_search_works(self, temp_files_folder: Path):
  369. """Agent should be able to search shared files and find content."""
  370. agent_a, agent_b = _create_fresh_agents()
  371. agency = Agency(
  372. agent_a,
  373. shared_files_folder=str(temp_files_folder),
  374. communication_flows=[(agent_a, agent_b)],
  375. )
  376. vs_id = agent_a._associated_vector_store_id
  377. assert vs_id is not None
  378. deadline = time.monotonic() + 90
  379. filenames: list[str] = []
  380. while time.monotonic() < deadline:
  381. vs_files = agent_a.client_sync.vector_stores.files.list(vector_store_id=vs_id)
  382. file_ids = [vf.id for vf in vs_files.data]
  383. filenames = [agent_a.client_sync.files.retrieve(fid).filename for fid in file_ids]
  384. if "secrets.txt" in filenames:
  385. break
  386. time.sleep(1)
  387. assert "secrets.txt" in filenames
  388. agent_a.file_manager._sync.wait_for_vector_store_files_ready(
  389. [(vs_id, file_id) for file_id in file_ids],
  390. timeout_seconds=180.0,
  391. )
  392. # Ask about content in the files
  393. response = await agency.get_response("What is the secret code? Search the files to find it.")
  394. # The response should contain the secret from secrets.txt
  395. assert "ALPHA" in response.final_output or "BRAVO" in response.final_output, (
  396. f"Expected secret code in response, got: {response.final_output}"
  397. )
  398. class TestSharedToolsEdgeCases:
  399. """Edge case tests for shared tools."""
  400. def test_empty_shared_tools_list(self, basic_agents: tuple[Agent, Agent]):
  401. """Empty shared_tools list should not cause errors."""
  402. agent_a, agent_b = basic_agents
  403. agency = Agency(
  404. agent_a,
  405. shared_tools=[],
  406. communication_flows=[(agent_a, agent_b)],
  407. )
  408. # Agency should initialize without errors
  409. assert agency is not None
  410. def test_shared_tools_with_duplicate_names_skipped(self, basic_agents: tuple[Agent, Agent]):
  411. """Duplicate tool names should be skipped with warning."""
  412. agent_a, agent_b = basic_agents
  413. @function_tool
  414. def duplicate_tool(x: str) -> str:
  415. """A tool that will be duplicated."""
  416. return x
  417. # Add the same tool twice
  418. agency = Agency(
  419. agent_a,
  420. shared_tools=[duplicate_tool, duplicate_tool],
  421. communication_flows=[(agent_a, agent_b)],
  422. )
  423. # Should only have one instance of the tool per agent
  424. for agent_name in ["AgentA", "AgentB"]:
  425. tool_names = [getattr(t, "name", None) for t in agency.agents[agent_name].tools]
  426. count = tool_names.count("duplicate_tool")
  427. assert count == 1, f"{agent_name} should have exactly 1 duplicate_tool, got {count}"
  428. def test_nonexistent_shared_tools_folder_handled(self, basic_agents: tuple[Agent, Agent], tmp_path: Path):
  429. """Nonexistent shared_tools_folder should be handled gracefully."""
  430. agent_a, agent_b = basic_agents
  431. nonexistent = tmp_path / "does_not_exist"
  432. # Should not raise, just warn
  433. agency = Agency(
  434. agent_a,
  435. shared_tools_folder=str(nonexistent),
  436. communication_flows=[(agent_a, agent_b)],
  437. )
  438. assert agency is not None